|
|
|
@ -129,78 +129,128 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
channelz::ChildRefsList* child_channels) override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
/// Contains a call to the LB server and all the data related to the call.
|
|
|
|
|
class BalancerCallState : public InternallyRefCounted<BalancerCallState> { |
|
|
|
|
/// Contains a channel to the LB server and all the data related to the
|
|
|
|
|
/// channel.
|
|
|
|
|
class BalancerChannelState |
|
|
|
|
: public InternallyRefCounted<BalancerChannelState> { |
|
|
|
|
public: |
|
|
|
|
explicit BalancerCallState( |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy); |
|
|
|
|
/// Contains a call to the LB server and all the data related to the call.
|
|
|
|
|
class BalancerCallState : public InternallyRefCounted<BalancerCallState> { |
|
|
|
|
public: |
|
|
|
|
explicit BalancerCallState(RefCountedPtr<BalancerChannelState> lb_chand); |
|
|
|
|
|
|
|
|
|
// It's the caller's responsibility to ensure that Orphan() is called from
|
|
|
|
|
// inside the combiner.
|
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
void StartQuery(); |
|
|
|
|
|
|
|
|
|
XdsLbClientStats* client_stats() const { return client_stats_.get(); } |
|
|
|
|
// It's the caller's responsibility to ensure that Orphan() is called from
|
|
|
|
|
// inside the combiner.
|
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
bool seen_initial_response() const { return seen_initial_response_; } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
// So Delete() can access our private dtor.
|
|
|
|
|
template <typename T> |
|
|
|
|
friend void grpc_core::Delete(T*); |
|
|
|
|
void StartQuery(); |
|
|
|
|
|
|
|
|
|
~BalancerCallState(); |
|
|
|
|
RefCountedPtr<XdsLbClientStats> client_stats() const { |
|
|
|
|
return client_stats_; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsLb* xdslb_policy() const { |
|
|
|
|
return static_cast<XdsLb*>(xdslb_policy_.get()); |
|
|
|
|
} |
|
|
|
|
bool seen_initial_response() const { return seen_initial_response_; } |
|
|
|
|
|
|
|
|
|
void ScheduleNextClientLoadReportLocked(); |
|
|
|
|
void SendClientLoadReportLocked(); |
|
|
|
|
private: |
|
|
|
|
// So Delete() can access our private dtor.
|
|
|
|
|
template <typename T> |
|
|
|
|
friend void grpc_core::Delete(T*); |
|
|
|
|
|
|
|
|
|
static bool LoadReportCountersAreZero(xds_grpclb_request* request); |
|
|
|
|
~BalancerCallState(); |
|
|
|
|
|
|
|
|
|
static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnInitialRequestSentLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
XdsLb* xdslb_policy() const { return lb_chand_->xdslb_policy_.get(); } |
|
|
|
|
|
|
|
|
|
// The owning LB policy.
|
|
|
|
|
RefCountedPtr<LoadBalancingPolicy> xdslb_policy_; |
|
|
|
|
bool IsCurrentCallOnChannel() const { |
|
|
|
|
return this == lb_chand_->lb_calld_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The streaming call to the LB server. Always non-NULL.
|
|
|
|
|
grpc_call* lb_call_ = nullptr; |
|
|
|
|
void ScheduleNextClientLoadReportLocked(); |
|
|
|
|
void SendClientLoadReportLocked(); |
|
|
|
|
|
|
|
|
|
static bool LoadReportCountersAreZero(xds_grpclb_request* request); |
|
|
|
|
|
|
|
|
|
static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnInitialRequestSentLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
// The owning LB channel.
|
|
|
|
|
RefCountedPtr<BalancerChannelState> lb_chand_; |
|
|
|
|
|
|
|
|
|
// The streaming call to the LB server. Always non-NULL.
|
|
|
|
|
grpc_call* lb_call_ = nullptr; |
|
|
|
|
|
|
|
|
|
// recv_initial_metadata
|
|
|
|
|
grpc_metadata_array lb_initial_metadata_recv_; |
|
|
|
|
|
|
|
|
|
// send_message
|
|
|
|
|
grpc_byte_buffer* send_message_payload_ = nullptr; |
|
|
|
|
grpc_closure lb_on_initial_request_sent_; |
|
|
|
|
|
|
|
|
|
// recv_message
|
|
|
|
|
grpc_byte_buffer* recv_message_payload_ = nullptr; |
|
|
|
|
grpc_closure lb_on_balancer_message_received_; |
|
|
|
|
bool seen_initial_response_ = false; |
|
|
|
|
|
|
|
|
|
// recv_trailing_metadata
|
|
|
|
|
grpc_closure lb_on_balancer_status_received_; |
|
|
|
|
grpc_metadata_array lb_trailing_metadata_recv_; |
|
|
|
|
grpc_status_code lb_call_status_; |
|
|
|
|
grpc_slice lb_call_status_details_; |
|
|
|
|
|
|
|
|
|
// The stats for client-side load reporting associated with this LB call.
|
|
|
|
|
// Created after the first serverlist is received.
|
|
|
|
|
RefCountedPtr<XdsLbClientStats> client_stats_; |
|
|
|
|
grpc_millis client_stats_report_interval_ = 0; |
|
|
|
|
grpc_timer client_load_report_timer_; |
|
|
|
|
bool client_load_report_timer_callback_pending_ = false; |
|
|
|
|
bool last_client_load_report_counters_were_zero_ = false; |
|
|
|
|
bool client_load_report_is_due_ = false; |
|
|
|
|
// The closure used for either the load report timer or the callback for
|
|
|
|
|
// completion of sending the load report.
|
|
|
|
|
grpc_closure client_load_report_closure_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
BalancerChannelState(const char* balancer_name, |
|
|
|
|
const grpc_channel_args& args, |
|
|
|
|
RefCountedPtr<XdsLb> parent_xdslb_policy); |
|
|
|
|
~BalancerChannelState(); |
|
|
|
|
|
|
|
|
|
// recv_initial_metadata
|
|
|
|
|
grpc_metadata_array lb_initial_metadata_recv_; |
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
// send_message
|
|
|
|
|
grpc_byte_buffer* send_message_payload_ = nullptr; |
|
|
|
|
grpc_closure lb_on_initial_request_sent_; |
|
|
|
|
grpc_channel* channel() const { return channel_; } |
|
|
|
|
BalancerCallState* lb_calld() const { return lb_calld_.get(); } |
|
|
|
|
|
|
|
|
|
// recv_message
|
|
|
|
|
grpc_byte_buffer* recv_message_payload_ = nullptr; |
|
|
|
|
grpc_closure lb_on_balancer_message_received_; |
|
|
|
|
bool seen_initial_response_ = false; |
|
|
|
|
bool IsCurrentChannel() const { |
|
|
|
|
return this == xdslb_policy_->lb_chand_.get(); |
|
|
|
|
} |
|
|
|
|
bool IsPendingChannel() const { |
|
|
|
|
return this == xdslb_policy_->pending_lb_chand_.get(); |
|
|
|
|
} |
|
|
|
|
bool HasActiveCall() const { return lb_calld_ != nullptr; } |
|
|
|
|
|
|
|
|
|
// recv_trailing_metadata
|
|
|
|
|
grpc_closure lb_on_balancer_status_received_; |
|
|
|
|
grpc_metadata_array lb_trailing_metadata_recv_; |
|
|
|
|
grpc_status_code lb_call_status_; |
|
|
|
|
grpc_slice lb_call_status_details_; |
|
|
|
|
void StartCallRetryTimerLocked(); |
|
|
|
|
static void OnCallRetryTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
void StartCallLocked(); |
|
|
|
|
|
|
|
|
|
// The stats for client-side load reporting associated with this LB call.
|
|
|
|
|
// Created after the first serverlist is received.
|
|
|
|
|
RefCountedPtr<XdsLbClientStats> client_stats_; |
|
|
|
|
grpc_millis client_stats_report_interval_ = 0; |
|
|
|
|
grpc_timer client_load_report_timer_; |
|
|
|
|
bool client_load_report_timer_callback_pending_ = false; |
|
|
|
|
bool last_client_load_report_counters_were_zero_ = false; |
|
|
|
|
bool client_load_report_is_due_ = false; |
|
|
|
|
// The closure used for either the load report timer or the callback for
|
|
|
|
|
// completion of sending the load report.
|
|
|
|
|
grpc_closure client_load_report_closure_; |
|
|
|
|
private: |
|
|
|
|
// The owning LB policy.
|
|
|
|
|
RefCountedPtr<XdsLb> xdslb_policy_; |
|
|
|
|
|
|
|
|
|
// The channel and its status.
|
|
|
|
|
grpc_channel* channel_; |
|
|
|
|
bool shutting_down_ = false; |
|
|
|
|
|
|
|
|
|
// The data associated with the current LB call. It holds a ref to this LB
|
|
|
|
|
// channel. It's instantiated every time we query for backends. It's reset
|
|
|
|
|
// whenever the current LB call is no longer needed (e.g., the LB policy is
|
|
|
|
|
// shutting down, or the LB call has ended). A non-NULL lb_calld_ always
|
|
|
|
|
// contains a non-NULL lb_call_.
|
|
|
|
|
OrphanablePtr<BalancerCallState> lb_calld_; |
|
|
|
|
BackOff lb_call_backoff_; |
|
|
|
|
grpc_timer lb_call_retry_timer_; |
|
|
|
|
grpc_closure lb_on_call_retry_; |
|
|
|
|
bool retry_timer_callback_pending_ = false; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class Picker : public SubchannelPicker { |
|
|
|
@ -245,13 +295,13 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
// found. Does nothing upon failure.
|
|
|
|
|
void ParseLbConfig(Config* xds_config); |
|
|
|
|
|
|
|
|
|
// Methods for dealing with the balancer channel and call.
|
|
|
|
|
void StartBalancerCallLocked(); |
|
|
|
|
BalancerChannelState* LatestLbChannel() const { |
|
|
|
|
return pending_lb_chand_ != nullptr ? pending_lb_chand_.get() |
|
|
|
|
: lb_chand_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Callback to enter fallback mode.
|
|
|
|
|
static void OnFallbackTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
void StartBalancerCallRetryTimerLocked(); |
|
|
|
|
static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnBalancerChannelConnectivityChangedLocked(void* arg, |
|
|
|
|
grpc_error* error); |
|
|
|
|
|
|
|
|
|
// Methods for dealing with the child policy.
|
|
|
|
|
void CreateOrUpdateChildPolicyLocked(); |
|
|
|
@ -271,30 +321,15 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
bool shutting_down_ = false; |
|
|
|
|
|
|
|
|
|
// The channel for communicating with the LB server.
|
|
|
|
|
grpc_channel* lb_channel_ = nullptr; |
|
|
|
|
OrphanablePtr<BalancerChannelState> lb_chand_; |
|
|
|
|
OrphanablePtr<BalancerChannelState> pending_lb_chand_; |
|
|
|
|
// Mutex to protect the channel to the LB server. This is used when
|
|
|
|
|
// processing a channelz request.
|
|
|
|
|
gpr_mu lb_channel_mu_; |
|
|
|
|
grpc_connectivity_state lb_channel_connectivity_; |
|
|
|
|
grpc_closure lb_channel_on_connectivity_changed_; |
|
|
|
|
// Are we already watching the LB channel's connectivity?
|
|
|
|
|
bool watching_lb_channel_ = false; |
|
|
|
|
// Response generator to inject address updates into lb_channel_.
|
|
|
|
|
RefCountedPtr<FakeResolverResponseGenerator> response_generator_; |
|
|
|
|
|
|
|
|
|
// The data associated with the current LB call. It holds a ref to this LB
|
|
|
|
|
// policy. It's initialized every time we query for backends. It's reset to
|
|
|
|
|
// NULL whenever the current LB call is no longer needed (e.g., the LB policy
|
|
|
|
|
// is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
|
|
|
|
|
// contains a non-NULL lb_call_.
|
|
|
|
|
OrphanablePtr<BalancerCallState> lb_calld_; |
|
|
|
|
// TODO(juanlishen): Replace this with atomic.
|
|
|
|
|
gpr_mu lb_chand_mu_; |
|
|
|
|
|
|
|
|
|
// Timeout in milliseconds for the LB call. 0 means no deadline.
|
|
|
|
|
int lb_call_timeout_ms_ = 0; |
|
|
|
|
// Balancer call retry state.
|
|
|
|
|
BackOff lb_call_backoff_; |
|
|
|
|
bool retry_timer_callback_pending_ = false; |
|
|
|
|
grpc_timer lb_call_retry_timer_; |
|
|
|
|
grpc_closure lb_on_call_retry_; |
|
|
|
|
|
|
|
|
|
// The deserialized response from the balancer. May be nullptr until one
|
|
|
|
|
// such response has arrived.
|
|
|
|
@ -360,11 +395,11 @@ void XdsLb::Helper::UpdateState(grpc_connectivity_state state, |
|
|
|
|
// TODO(juanlishen): When in fallback mode, pass the child picker
|
|
|
|
|
// through without wrapping it. (Or maybe use a different helper for
|
|
|
|
|
// the fallback policy?)
|
|
|
|
|
RefCountedPtr<XdsLbClientStats> client_stats; |
|
|
|
|
if (parent_->lb_calld_ != nullptr && |
|
|
|
|
parent_->lb_calld_->client_stats() != nullptr) { |
|
|
|
|
client_stats = parent_->lb_calld_->client_stats()->Ref(); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(parent_->lb_chand_ != nullptr); |
|
|
|
|
RefCountedPtr<XdsLbClientStats> client_stats = |
|
|
|
|
parent_->lb_chand_->lb_calld() == nullptr |
|
|
|
|
? nullptr |
|
|
|
|
: parent_->lb_chand_->lb_calld()->client_stats(); |
|
|
|
|
parent_->channel_control_helper()->UpdateState( |
|
|
|
|
state, state_error, |
|
|
|
|
UniquePtr<SubchannelPicker>( |
|
|
|
@ -379,12 +414,13 @@ void XdsLb::Helper::RequestReresolution() { |
|
|
|
|
"(%p).", |
|
|
|
|
parent_.get(), parent_->child_policy_.get()); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(parent_->lb_chand_ != nullptr); |
|
|
|
|
// If we are talking to a balancer, we expect to get updated addresses
|
|
|
|
|
// from the balancer, so we can ignore the re-resolution request from
|
|
|
|
|
// the RR policy. Otherwise, pass the re-resolution request up to the
|
|
|
|
|
// the child policy. Otherwise, pass the re-resolution request up to the
|
|
|
|
|
// channel.
|
|
|
|
|
if (parent_->lb_calld_ == nullptr || |
|
|
|
|
!parent_->lb_calld_->seen_initial_response()) { |
|
|
|
|
if (parent_->lb_chand_->lb_calld() == nullptr || |
|
|
|
|
!parent_->lb_chand_->lb_calld()->seen_initial_response()) { |
|
|
|
|
parent_->channel_control_helper()->RequestReresolution(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -465,14 +501,98 @@ UniquePtr<ServerAddressList> ProcessServerlist( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// XdsLb::BalancerCallState
|
|
|
|
|
// XdsLb::BalancerChannelState
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
XdsLb::BalancerCallState::BalancerCallState( |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy) |
|
|
|
|
XdsLb::BalancerChannelState::BalancerChannelState( |
|
|
|
|
const char* balancer_name, const grpc_channel_args& args, |
|
|
|
|
grpc_core::RefCountedPtr<grpc_core::XdsLb> parent_xdslb_policy) |
|
|
|
|
: InternallyRefCounted<BalancerChannelState>(&grpc_lb_xds_trace), |
|
|
|
|
xdslb_policy_(std::move(parent_xdslb_policy)), |
|
|
|
|
lb_call_backoff_( |
|
|
|
|
BackOff::Options() |
|
|
|
|
.set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS * |
|
|
|
|
1000) |
|
|
|
|
.set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER) |
|
|
|
|
.set_jitter(GRPC_XDS_RECONNECT_JITTER) |
|
|
|
|
.set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { |
|
|
|
|
channel_ = xdslb_policy_->channel_control_helper()->CreateChannel( |
|
|
|
|
balancer_name, args); |
|
|
|
|
GPR_ASSERT(channel_ != nullptr); |
|
|
|
|
StartCallLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsLb::BalancerChannelState::~BalancerChannelState() { |
|
|
|
|
grpc_channel_destroy(channel_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerChannelState::Orphan() { |
|
|
|
|
shutting_down_ = true; |
|
|
|
|
lb_calld_.reset(); |
|
|
|
|
if (retry_timer_callback_pending_) grpc_timer_cancel(&lb_call_retry_timer_); |
|
|
|
|
Unref(DEBUG_LOCATION, "lb_channel_orphaned"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerChannelState::StartCallRetryTimerLocked() { |
|
|
|
|
grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Failed to connect to LB server (lb_chand: %p)...", |
|
|
|
|
xdslb_policy_.get(), this); |
|
|
|
|
grpc_millis timeout = next_try - ExecCtx::Get()->Now(); |
|
|
|
|
if (timeout > 0) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active in %" PRId64 "ms.", |
|
|
|
|
xdslb_policy_.get(), timeout); |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active immediately.", |
|
|
|
|
xdslb_policy_.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer").release(); |
|
|
|
|
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &OnCallRetryTimerLocked, this, |
|
|
|
|
grpc_combiner_scheduler(xdslb_policy_->combiner())); |
|
|
|
|
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); |
|
|
|
|
retry_timer_callback_pending_ = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerChannelState::OnCallRetryTimerLocked(void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
BalancerChannelState* lb_chand = static_cast<BalancerChannelState*>(arg); |
|
|
|
|
lb_chand->retry_timer_callback_pending_ = false; |
|
|
|
|
if (!lb_chand->shutting_down_ && error == GRPC_ERROR_NONE && |
|
|
|
|
lb_chand->lb_calld_ == nullptr) { |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Restarting call to LB server (lb_chand: %p)", |
|
|
|
|
lb_chand->xdslb_policy_.get(), lb_chand); |
|
|
|
|
} |
|
|
|
|
lb_chand->StartCallLocked(); |
|
|
|
|
} |
|
|
|
|
lb_chand->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerChannelState::StartCallLocked() { |
|
|
|
|
if (shutting_down_) return; |
|
|
|
|
GPR_ASSERT(channel_ != nullptr); |
|
|
|
|
GPR_ASSERT(lb_calld_ == nullptr); |
|
|
|
|
lb_calld_ = MakeOrphanable<BalancerCallState>(Ref()); |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Query for backends (lb_chand: %p, lb_calld: %p)", |
|
|
|
|
xdslb_policy_.get(), this, lb_calld_.get()); |
|
|
|
|
} |
|
|
|
|
lb_calld_->StartQuery(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// XdsLb::BalancerChannelState::BalancerCallState
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
XdsLb::BalancerChannelState::BalancerCallState::BalancerCallState( |
|
|
|
|
RefCountedPtr<BalancerChannelState> lb_chand) |
|
|
|
|
: InternallyRefCounted<BalancerCallState>(&grpc_lb_xds_trace), |
|
|
|
|
xdslb_policy_(std::move(parent_xdslb_policy)) { |
|
|
|
|
GPR_ASSERT(xdslb_policy_ != nullptr); |
|
|
|
|
lb_chand_(std::move(lb_chand)) { |
|
|
|
|
GPR_ASSERT(xdslb_policy() != nullptr); |
|
|
|
|
GPR_ASSERT(!xdslb_policy()->shutting_down_); |
|
|
|
|
// Init the LB call. Note that the LB call will progress every time there's
|
|
|
|
|
// activity in xdslb_policy_->interested_parties(), which is comprised of
|
|
|
|
@ -484,8 +604,8 @@ XdsLb::BalancerCallState::BalancerCallState( |
|
|
|
|
? GRPC_MILLIS_INF_FUTURE |
|
|
|
|
: ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_; |
|
|
|
|
lb_call_ = grpc_channel_create_pollset_set_call( |
|
|
|
|
xdslb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, |
|
|
|
|
xdslb_policy_->interested_parties(), |
|
|
|
|
lb_chand_->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, |
|
|
|
|
xdslb_policy()->interested_parties(), |
|
|
|
|
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, |
|
|
|
|
nullptr, deadline, nullptr); |
|
|
|
|
// Init the LB call request payload.
|
|
|
|
@ -509,7 +629,7 @@ XdsLb::BalancerCallState::BalancerCallState( |
|
|
|
|
grpc_combiner_scheduler(xdslb_policy()->combiner())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsLb::BalancerCallState::~BalancerCallState() { |
|
|
|
|
XdsLb::BalancerChannelState::BalancerCallState::~BalancerCallState() { |
|
|
|
|
GPR_ASSERT(lb_call_ != nullptr); |
|
|
|
|
grpc_call_unref(lb_call_); |
|
|
|
|
grpc_metadata_array_destroy(&lb_initial_metadata_recv_); |
|
|
|
@ -519,7 +639,7 @@ XdsLb::BalancerCallState::~BalancerCallState() { |
|
|
|
|
grpc_slice_unref_internal(lb_call_status_details_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerCallState::Orphan() { |
|
|
|
|
void XdsLb::BalancerChannelState::BalancerCallState::Orphan() { |
|
|
|
|
GPR_ASSERT(lb_call_ != nullptr); |
|
|
|
|
// If we are here because xdslb_policy wants to cancel the call,
|
|
|
|
|
// lb_on_balancer_status_received_ will complete the cancellation and clean
|
|
|
|
@ -534,11 +654,11 @@ void XdsLb::BalancerCallState::Orphan() { |
|
|
|
|
// in lb_on_balancer_status_received_ instead of here.
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerCallState::StartQuery() { |
|
|
|
|
void XdsLb::BalancerChannelState::BalancerCallState::StartQuery() { |
|
|
|
|
GPR_ASSERT(lb_call_ != nullptr); |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Starting LB call (lb_calld: %p, lb_call: %p)", |
|
|
|
|
xdslb_policy_.get(), this, lb_call_); |
|
|
|
|
xdslb_policy(), this, lb_call_); |
|
|
|
|
} |
|
|
|
|
// Create the ops.
|
|
|
|
|
grpc_call_error call_error; |
|
|
|
@ -606,7 +726,8 @@ void XdsLb::BalancerCallState::StartQuery() { |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { |
|
|
|
|
void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
ScheduleNextClientLoadReportLocked() { |
|
|
|
|
const grpc_millis next_client_load_report_time = |
|
|
|
|
ExecCtx::Get()->Now() + client_stats_report_interval_; |
|
|
|
|
GRPC_CLOSURE_INIT(&client_load_report_closure_, |
|
|
|
@ -617,12 +738,11 @@ void XdsLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { |
|
|
|
|
client_load_report_timer_callback_pending_ = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerCallState::MaybeSendClientLoadReportLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
MaybeSendClientLoadReportLocked(void* arg, grpc_error* error) { |
|
|
|
|
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
|
|
|
|
XdsLb* xdslb_policy = lb_calld->xdslb_policy(); |
|
|
|
|
lb_calld->client_load_report_timer_callback_pending_ = false; |
|
|
|
|
if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) { |
|
|
|
|
if (error != GRPC_ERROR_NONE || !lb_calld->IsCurrentCallOnChannel()) { |
|
|
|
|
lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -636,7 +756,7 @@ void XdsLb::BalancerCallState::MaybeSendClientLoadReportLocked( |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool XdsLb::BalancerCallState::LoadReportCountersAreZero( |
|
|
|
|
bool XdsLb::BalancerChannelState::BalancerCallState::LoadReportCountersAreZero( |
|
|
|
|
xds_grpclb_request* request) { |
|
|
|
|
XdsLbClientStats::DroppedCallCounts* drop_entries = |
|
|
|
|
static_cast<XdsLbClientStats::DroppedCallCounts*>( |
|
|
|
@ -650,7 +770,8 @@ bool XdsLb::BalancerCallState::LoadReportCountersAreZero( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO(vpowar): Use LRS to send the client Load Report.
|
|
|
|
|
void XdsLb::BalancerCallState::SendClientLoadReportLocked() { |
|
|
|
|
void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
SendClientLoadReportLocked() { |
|
|
|
|
// Construct message payload.
|
|
|
|
|
GPR_ASSERT(send_message_payload_ == nullptr); |
|
|
|
|
xds_grpclb_request* request = |
|
|
|
@ -671,27 +792,27 @@ void XdsLb::BalancerCallState::SendClientLoadReportLocked() { |
|
|
|
|
xds_grpclb_request_destroy(request); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerCallState::OnInitialRequestSentLocked(void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
void XdsLb::BalancerChannelState::BalancerCallState::OnInitialRequestSentLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
|
|
|
|
grpc_byte_buffer_destroy(lb_calld->send_message_payload_); |
|
|
|
|
lb_calld->send_message_payload_ = nullptr; |
|
|
|
|
// If we attempted to send a client load report before the initial request was
|
|
|
|
|
// sent (and this lb_calld is still in use), send the load report now.
|
|
|
|
|
if (lb_calld->client_load_report_is_due_ && |
|
|
|
|
lb_calld == lb_calld->xdslb_policy()->lb_calld_.get()) { |
|
|
|
|
lb_calld->IsCurrentCallOnChannel()) { |
|
|
|
|
lb_calld->SendClientLoadReportLocked(); |
|
|
|
|
lb_calld->client_load_report_is_due_ = false; |
|
|
|
|
} |
|
|
|
|
lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
OnBalancerMessageReceivedLocked(void* arg, grpc_error* error) { |
|
|
|
|
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
|
|
|
|
XdsLb* xdslb_policy = lb_calld->xdslb_policy(); |
|
|
|
|
// Empty payload means the LB call was cancelled.
|
|
|
|
|
if (lb_calld != xdslb_policy->lb_calld_.get() || |
|
|
|
|
if (!lb_calld->IsCurrentCallOnChannel() || |
|
|
|
|
lb_calld->recv_message_payload_ == nullptr) { |
|
|
|
|
lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); |
|
|
|
|
return; |
|
|
|
@ -709,20 +830,25 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( |
|
|
|
|
nullptr) { |
|
|
|
|
// Have NOT seen initial response, look for initial response.
|
|
|
|
|
if (initial_response->has_client_stats_report_interval) { |
|
|
|
|
lb_calld->client_stats_report_interval_ = GPR_MAX( |
|
|
|
|
GPR_MS_PER_SEC, xds_grpclb_duration_to_millis( |
|
|
|
|
&initial_response->client_stats_report_interval)); |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
const grpc_millis interval = xds_grpclb_duration_to_millis( |
|
|
|
|
&initial_response->client_stats_report_interval); |
|
|
|
|
if (interval > 0) { |
|
|
|
|
lb_calld->client_stats_report_interval_ = |
|
|
|
|
GPR_MAX(GPR_MS_PER_SEC, interval); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
if (lb_calld->client_stats_report_interval_ != 0) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Received initial LB response message; " |
|
|
|
|
"client load reporting interval = %" PRId64 " milliseconds", |
|
|
|
|
xdslb_policy, lb_calld->client_stats_report_interval_); |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Received initial LB response message; client load " |
|
|
|
|
"reporting NOT enabled", |
|
|
|
|
xdslb_policy); |
|
|
|
|
} |
|
|
|
|
} else if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Received initial LB response message; client load " |
|
|
|
|
"reporting NOT enabled", |
|
|
|
|
xdslb_policy); |
|
|
|
|
} |
|
|
|
|
xds_grpclb_initial_response_destroy(initial_response); |
|
|
|
|
lb_calld->seen_initial_response_ = true; |
|
|
|
@ -745,7 +871,23 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* update serverlist */ |
|
|
|
|
// TODO(juanlishen): Don't ingore empty serverlist.
|
|
|
|
|
if (serverlist->num_servers > 0) { |
|
|
|
|
// Pending LB channel receives a serverlist; promote it.
|
|
|
|
|
// Note that this call can't be on a discarded pending channel, because
|
|
|
|
|
// such channels don't have any current call but we have checked this call
|
|
|
|
|
// is a current call.
|
|
|
|
|
if (!lb_calld->lb_chand_->IsCurrentChannel()) { |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Promoting pending LB channel %p to replace " |
|
|
|
|
"current LB channel %p", |
|
|
|
|
xdslb_policy, lb_calld->lb_chand_.get(), |
|
|
|
|
lb_calld->xdslb_policy()->lb_chand_.get()); |
|
|
|
|
} |
|
|
|
|
lb_calld->xdslb_policy()->lb_chand_ = |
|
|
|
|
std::move(lb_calld->xdslb_policy()->pending_lb_chand_); |
|
|
|
|
} |
|
|
|
|
// Start sending client load report only after we start using the
|
|
|
|
|
// serverlist returned from the current LB call.
|
|
|
|
|
if (lb_calld->client_stats_report_interval_ > 0 && |
|
|
|
@ -818,37 +960,53 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
OnBalancerStatusReceivedLocked(void* arg, grpc_error* error) { |
|
|
|
|
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
|
|
|
|
XdsLb* xdslb_policy = lb_calld->xdslb_policy(); |
|
|
|
|
BalancerChannelState* lb_chand = lb_calld->lb_chand_.get(); |
|
|
|
|
GPR_ASSERT(lb_calld->lb_call_ != nullptr); |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
char* status_details = |
|
|
|
|
grpc_slice_to_c_string(lb_calld->lb_call_status_details_); |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Status from LB server received. Status = %d, details " |
|
|
|
|
"= '%s', (lb_calld: %p, lb_call: %p), error '%s'", |
|
|
|
|
xdslb_policy, lb_calld->lb_call_status_, status_details, lb_calld, |
|
|
|
|
lb_calld->lb_call_, grpc_error_string(error)); |
|
|
|
|
"= '%s', (lb_chand: %p, lb_calld: %p, lb_call: %p), error '%s'", |
|
|
|
|
xdslb_policy, lb_calld->lb_call_status_, status_details, lb_chand, |
|
|
|
|
lb_calld, lb_calld->lb_call_, grpc_error_string(error)); |
|
|
|
|
gpr_free(status_details); |
|
|
|
|
} |
|
|
|
|
// If this lb_calld is still in use, this call ended because of a failure so
|
|
|
|
|
// we want to retry connecting. Otherwise, we have deliberately ended this
|
|
|
|
|
// call and no further action is required.
|
|
|
|
|
if (lb_calld == xdslb_policy->lb_calld_.get()) { |
|
|
|
|
xdslb_policy->lb_calld_.reset(); |
|
|
|
|
// Ignore status from a stale call.
|
|
|
|
|
if (lb_calld->IsCurrentCallOnChannel()) { |
|
|
|
|
// Because this call is the current one on the channel, the channel can't
|
|
|
|
|
// have been swapped out; otherwise, the call should have been reset.
|
|
|
|
|
GPR_ASSERT(lb_chand->IsCurrentChannel() || lb_chand->IsPendingChannel()); |
|
|
|
|
GPR_ASSERT(!xdslb_policy->shutting_down_); |
|
|
|
|
xdslb_policy->channel_control_helper()->RequestReresolution(); |
|
|
|
|
if (lb_calld->seen_initial_response_) { |
|
|
|
|
// If we lose connection to the LB server, reset the backoff and restart
|
|
|
|
|
// the LB call immediately.
|
|
|
|
|
xdslb_policy->lb_call_backoff_.Reset(); |
|
|
|
|
xdslb_policy->StartBalancerCallLocked(); |
|
|
|
|
if (lb_chand != xdslb_policy->LatestLbChannel()) { |
|
|
|
|
// This channel must be the current one and there is a pending one. Swap
|
|
|
|
|
// in the pending one and we are done.
|
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Promoting pending LB channel %p to replace " |
|
|
|
|
"current LB channel %p", |
|
|
|
|
xdslb_policy, lb_calld->lb_chand_.get(), |
|
|
|
|
lb_calld->xdslb_policy()->lb_chand_.get()); |
|
|
|
|
} |
|
|
|
|
xdslb_policy->lb_chand_ = std::move(xdslb_policy->pending_lb_chand_); |
|
|
|
|
} else { |
|
|
|
|
// If this LB call fails establishing any connection to the LB server,
|
|
|
|
|
// retry later.
|
|
|
|
|
xdslb_policy->StartBalancerCallRetryTimerLocked(); |
|
|
|
|
// This channel is the most recently created one. Try to restart the call
|
|
|
|
|
// and reresolve.
|
|
|
|
|
lb_chand->lb_calld_.reset(); |
|
|
|
|
if (lb_calld->seen_initial_response_) { |
|
|
|
|
// If we lost connection to the LB server, reset the backoff and restart
|
|
|
|
|
// the LB call immediately.
|
|
|
|
|
lb_chand->lb_call_backoff_.Reset(); |
|
|
|
|
lb_chand->StartCallLocked(); |
|
|
|
|
} else { |
|
|
|
|
// If we failed to connect to the LB server, retry later.
|
|
|
|
|
lb_chand->StartCallRetryTimerLocked(); |
|
|
|
|
} |
|
|
|
|
xdslb_policy->channel_control_helper()->RequestReresolution(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended"); |
|
|
|
@ -858,53 +1016,23 @@ void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked( |
|
|
|
|
// helper code for creating balancer channel
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
UniquePtr<ServerAddressList> ExtractBalancerAddresses( |
|
|
|
|
const ServerAddressList& addresses) { |
|
|
|
|
auto balancer_addresses = MakeUnique<ServerAddressList>(); |
|
|
|
|
for (size_t i = 0; i < addresses.size(); ++i) { |
|
|
|
|
if (addresses[i].IsBalancer()) { |
|
|
|
|
balancer_addresses->emplace_back(addresses[i]); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return balancer_addresses; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Returns the channel args for the LB channel, used to create a bidirectional
|
|
|
|
|
* stream for the reception of load balancing updates. |
|
|
|
|
* |
|
|
|
|
* Inputs: |
|
|
|
|
* - \a addresses: corresponding to the balancers. |
|
|
|
|
* - \a response_generator: in order to propagate updates from the resolver |
|
|
|
|
* above the grpclb policy. |
|
|
|
|
* - \a args: other args inherited from the xds policy. */ |
|
|
|
|
grpc_channel_args* BuildBalancerChannelArgs( |
|
|
|
|
const ServerAddressList& addresses, |
|
|
|
|
FakeResolverResponseGenerator* response_generator, |
|
|
|
|
const grpc_channel_args* args) { |
|
|
|
|
UniquePtr<ServerAddressList> balancer_addresses = |
|
|
|
|
ExtractBalancerAddresses(addresses); |
|
|
|
|
// Channel args to remove.
|
|
|
|
|
// Returns the channel args for the LB channel, used to create a bidirectional
|
|
|
|
|
// stream for the reception of load balancing updates.
|
|
|
|
|
grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) { |
|
|
|
|
static const char* args_to_remove[] = { |
|
|
|
|
// LB policy name, since we want to use the default (pick_first) in
|
|
|
|
|
// the LB channel.
|
|
|
|
|
GRPC_ARG_LB_POLICY_NAME, |
|
|
|
|
// The service config that contains the LB config. We don't want to
|
|
|
|
|
// recursively use xds in the LB channel.
|
|
|
|
|
GRPC_ARG_SERVICE_CONFIG, |
|
|
|
|
// The channel arg for the server URI, since that will be different for
|
|
|
|
|
// the LB channel than for the parent channel. The client channel
|
|
|
|
|
// factory will re-add this arg with the right value.
|
|
|
|
|
GRPC_ARG_SERVER_URI, |
|
|
|
|
// The resolved addresses, which will be generated by the name resolver
|
|
|
|
|
// used in the LB channel. Note that the LB channel will use the fake
|
|
|
|
|
// resolver, so this won't actually generate a query to DNS (or some
|
|
|
|
|
// other name service). However, the addresses returned by the fake
|
|
|
|
|
// resolver will have is_balancer=false, whereas our own addresses have
|
|
|
|
|
// is_balancer=true. We need the LB channel to return addresses with
|
|
|
|
|
// is_balancer=false so that it does not wind up recursively using the
|
|
|
|
|
// xds LB policy, as per the special case logic in client_channel.c.
|
|
|
|
|
// used in the LB channel.
|
|
|
|
|
GRPC_ARG_SERVER_ADDRESS_LIST, |
|
|
|
|
// The fake resolver response generator, because we are replacing it
|
|
|
|
|
// with the one from the xds policy, used to propagate updates to
|
|
|
|
|
// the LB channel.
|
|
|
|
|
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, |
|
|
|
|
// The LB channel should use the authority indicated by the target
|
|
|
|
|
// authority table (see \a grpc_lb_policy_xds_modify_lb_channel_args),
|
|
|
|
|
// as opposed to the authority from the parent channel.
|
|
|
|
@ -916,14 +1044,6 @@ grpc_channel_args* BuildBalancerChannelArgs( |
|
|
|
|
}; |
|
|
|
|
// Channel args to add.
|
|
|
|
|
const grpc_arg args_to_add[] = { |
|
|
|
|
// New server address list.
|
|
|
|
|
// Note that we pass these in both when creating the LB channel
|
|
|
|
|
// and via the fake resolver. The latter is what actually gets used.
|
|
|
|
|
CreateServerAddressListChannelArg(balancer_addresses.get()), |
|
|
|
|
// The fake resolver response generator, which we use to inject
|
|
|
|
|
// address updates into the LB channel.
|
|
|
|
|
grpc_core::FakeResolverResponseGenerator::MakeChannelArg( |
|
|
|
|
response_generator), |
|
|
|
|
// A channel arg indicating the target is a xds load balancer.
|
|
|
|
|
grpc_channel_arg_integer_create( |
|
|
|
|
const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1), |
|
|
|
@ -944,21 +1064,8 @@ grpc_channel_args* BuildBalancerChannelArgs( |
|
|
|
|
// ctor and dtor
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
XdsLb::XdsLb(Args args) |
|
|
|
|
: LoadBalancingPolicy(std::move(args)), |
|
|
|
|
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), |
|
|
|
|
lb_call_backoff_( |
|
|
|
|
BackOff::Options() |
|
|
|
|
.set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS * |
|
|
|
|
1000) |
|
|
|
|
.set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER) |
|
|
|
|
.set_jitter(GRPC_XDS_RECONNECT_JITTER) |
|
|
|
|
.set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { |
|
|
|
|
// Initialization.
|
|
|
|
|
gpr_mu_init(&lb_channel_mu_); |
|
|
|
|
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, |
|
|
|
|
&XdsLb::OnBalancerChannelConnectivityChangedLocked, this, |
|
|
|
|
grpc_combiner_scheduler(args.combiner)); |
|
|
|
|
XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)) { |
|
|
|
|
gpr_mu_init(&lb_chand_mu_); |
|
|
|
|
// Record server name.
|
|
|
|
|
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); |
|
|
|
|
const char* server_uri = grpc_channel_arg_get_string(arg); |
|
|
|
@ -982,7 +1089,7 @@ XdsLb::XdsLb(Args args) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsLb::~XdsLb() { |
|
|
|
|
gpr_mu_destroy(&lb_channel_mu_); |
|
|
|
|
gpr_mu_destroy(&lb_chand_mu_); |
|
|
|
|
gpr_free((void*)server_name_); |
|
|
|
|
grpc_channel_args_destroy(args_); |
|
|
|
|
if (serverlist_ != nullptr) { |
|
|
|
@ -992,10 +1099,6 @@ XdsLb::~XdsLb() { |
|
|
|
|
|
|
|
|
|
void XdsLb::ShutdownLocked() { |
|
|
|
|
shutting_down_ = true; |
|
|
|
|
lb_calld_.reset(); |
|
|
|
|
if (retry_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&lb_call_retry_timer_); |
|
|
|
|
} |
|
|
|
|
if (fallback_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&lb_fallback_timer_); |
|
|
|
|
} |
|
|
|
@ -1004,11 +1107,10 @@ void XdsLb::ShutdownLocked() { |
|
|
|
|
// destroying the channel triggers a last callback to
|
|
|
|
|
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
|
|
|
|
|
// alive when that callback is invoked.
|
|
|
|
|
if (lb_channel_ != nullptr) { |
|
|
|
|
gpr_mu_lock(&lb_channel_mu_); |
|
|
|
|
grpc_channel_destroy(lb_channel_); |
|
|
|
|
lb_channel_ = nullptr; |
|
|
|
|
gpr_mu_unlock(&lb_channel_mu_); |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&lb_chand_mu_); |
|
|
|
|
lb_chand_.reset(); |
|
|
|
|
pending_lb_chand_.reset(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1017,8 +1119,11 @@ void XdsLb::ShutdownLocked() { |
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
void XdsLb::ResetBackoffLocked() { |
|
|
|
|
if (lb_channel_ != nullptr) { |
|
|
|
|
grpc_channel_reset_connect_backoff(lb_channel_); |
|
|
|
|
if (lb_chand_ != nullptr) { |
|
|
|
|
grpc_channel_reset_connect_backoff(lb_chand_->channel()); |
|
|
|
|
} |
|
|
|
|
if (pending_lb_chand_ != nullptr) { |
|
|
|
|
grpc_channel_reset_connect_backoff(pending_lb_chand_->channel()); |
|
|
|
|
} |
|
|
|
|
if (child_policy_ != nullptr) { |
|
|
|
|
child_policy_->ResetBackoffLocked(); |
|
|
|
@ -1027,12 +1132,19 @@ void XdsLb::ResetBackoffLocked() { |
|
|
|
|
|
|
|
|
|
void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, |
|
|
|
|
channelz::ChildRefsList* child_channels) { |
|
|
|
|
// delegate to the child_policy_ to fill the children subchannels.
|
|
|
|
|
// Delegate to the child_policy_ to fill the children subchannels.
|
|
|
|
|
child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); |
|
|
|
|
MutexLock lock(&lb_channel_mu_); |
|
|
|
|
if (lb_channel_ != nullptr) { |
|
|
|
|
MutexLock lock(&lb_chand_mu_); |
|
|
|
|
if (lb_chand_ != nullptr) { |
|
|
|
|
grpc_core::channelz::ChannelNode* channel_node = |
|
|
|
|
grpc_channel_get_channelz_node(lb_channel_); |
|
|
|
|
grpc_channel_get_channelz_node(lb_chand_->channel()); |
|
|
|
|
if (channel_node != nullptr) { |
|
|
|
|
child_channels->push_back(channel_node->uuid()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (pending_lb_chand_ != nullptr) { |
|
|
|
|
grpc_core::channelz::ChannelNode* channel_node = |
|
|
|
|
grpc_channel_get_channelz_node(pending_lb_chand_->channel()); |
|
|
|
|
if (channel_node != nullptr) { |
|
|
|
|
child_channels->push_back(channel_node->uuid()); |
|
|
|
|
} |
|
|
|
@ -1059,22 +1171,29 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { |
|
|
|
|
args_ = grpc_channel_args_copy_and_add_and_remove( |
|
|
|
|
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); |
|
|
|
|
// Construct args for balancer channel.
|
|
|
|
|
grpc_channel_args* lb_channel_args = |
|
|
|
|
BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); |
|
|
|
|
// Create balancer channel if needed.
|
|
|
|
|
if (lb_channel_ == nullptr) { |
|
|
|
|
char* uri_str; |
|
|
|
|
gpr_asprintf(&uri_str, "fake:///%s", server_name_); |
|
|
|
|
gpr_mu_lock(&lb_channel_mu_); |
|
|
|
|
lb_channel_ = |
|
|
|
|
channel_control_helper()->CreateChannel(uri_str, *lb_channel_args); |
|
|
|
|
gpr_mu_unlock(&lb_channel_mu_); |
|
|
|
|
GPR_ASSERT(lb_channel_ != nullptr); |
|
|
|
|
gpr_free(uri_str); |
|
|
|
|
grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(&args); |
|
|
|
|
// Create an LB channel if we don't have one yet or the balancer name has
|
|
|
|
|
// changed from the last received one.
|
|
|
|
|
bool create_lb_channel = lb_chand_ == nullptr; |
|
|
|
|
if (lb_chand_ != nullptr) { |
|
|
|
|
UniquePtr<char> last_balancer_name( |
|
|
|
|
grpc_channel_get_target(LatestLbChannel()->channel())); |
|
|
|
|
create_lb_channel = |
|
|
|
|
strcmp(last_balancer_name.get(), balancer_name_.get()) != 0; |
|
|
|
|
} |
|
|
|
|
if (create_lb_channel) { |
|
|
|
|
OrphanablePtr<BalancerChannelState> lb_chand = |
|
|
|
|
MakeOrphanable<BalancerChannelState>(balancer_name_.get(), |
|
|
|
|
*lb_channel_args, Ref()); |
|
|
|
|
if (lb_chand_ == nullptr || !lb_chand_->HasActiveCall()) { |
|
|
|
|
GPR_ASSERT(pending_lb_chand_ == nullptr); |
|
|
|
|
// If we do not have a working LB channel yet, use the newly created one.
|
|
|
|
|
lb_chand_ = std::move(lb_chand); |
|
|
|
|
} else { |
|
|
|
|
// Otherwise, wait until the new LB channel to be ready to swap it in.
|
|
|
|
|
pending_lb_chand_ = std::move(lb_chand); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Propagate updates to the LB channel (pick_first) through the fake
|
|
|
|
|
// resolver.
|
|
|
|
|
response_generator_->SetResponse(lb_channel_args); |
|
|
|
|
grpc_channel_args_destroy(lb_channel_args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1114,12 +1233,13 @@ void XdsLb::ParseLbConfig(Config* xds_config) { |
|
|
|
|
|
|
|
|
|
void XdsLb::UpdateLocked(const grpc_channel_args& args, |
|
|
|
|
RefCountedPtr<Config> lb_config) { |
|
|
|
|
const bool is_initial_update = lb_channel_ == nullptr; |
|
|
|
|
const bool is_initial_update = lb_chand_ == nullptr; |
|
|
|
|
ParseLbConfig(lb_config.get()); |
|
|
|
|
// TODO(juanlishen): Pass fallback policy config update after fallback policy
|
|
|
|
|
// is added.
|
|
|
|
|
if (balancer_name_ == nullptr) { |
|
|
|
|
gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
ProcessChannelArgsLocked(args); |
|
|
|
|
// Update the existing child policy.
|
|
|
|
@ -1139,24 +1259,6 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args, |
|
|
|
|
fallback_timer_callback_pending_ = true; |
|
|
|
|
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); |
|
|
|
|
} |
|
|
|
|
StartBalancerCallLocked(); |
|
|
|
|
} else if (!watching_lb_channel_) { |
|
|
|
|
// If this is not the initial update and we're not already watching
|
|
|
|
|
// the LB channel's connectivity state, start a watch now. This
|
|
|
|
|
// ensures that we'll know when to switch to a new balancer call.
|
|
|
|
|
lb_channel_connectivity_ = grpc_channel_check_connectivity_state( |
|
|
|
|
lb_channel_, true /* try to connect */); |
|
|
|
|
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( |
|
|
|
|
grpc_channel_get_channel_stack(lb_channel_)); |
|
|
|
|
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); |
|
|
|
|
watching_lb_channel_ = true; |
|
|
|
|
// Ref held by closure.
|
|
|
|
|
Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release(); |
|
|
|
|
grpc_client_channel_watch_connectivity_state( |
|
|
|
|
client_channel_elem, |
|
|
|
|
grpc_polling_entity_create_from_pollset_set(interested_parties()), |
|
|
|
|
&lb_channel_connectivity_, &lb_channel_on_connectivity_changed_, |
|
|
|
|
nullptr); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1164,20 +1266,6 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args, |
|
|
|
|
// code for balancer channel and call
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
void XdsLb::StartBalancerCallLocked() { |
|
|
|
|
GPR_ASSERT(lb_channel_ != nullptr); |
|
|
|
|
if (shutting_down_) return; |
|
|
|
|
// Init the LB call data.
|
|
|
|
|
GPR_ASSERT(lb_calld_ == nullptr); |
|
|
|
|
lb_calld_ = MakeOrphanable<BalancerCallState>(Ref()); |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Query for backends (lb_channel: %p, lb_calld: %p)", |
|
|
|
|
this, lb_channel_, lb_calld_.get()); |
|
|
|
|
} |
|
|
|
|
lb_calld_->StartQuery(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { |
|
|
|
|
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); |
|
|
|
|
xdslb_policy->fallback_timer_callback_pending_ = false; |
|
|
|
@ -1194,88 +1282,6 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { |
|
|
|
|
xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::StartBalancerCallRetryTimerLocked() { |
|
|
|
|
grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Connection to LB server lost...", this); |
|
|
|
|
grpc_millis timeout = next_try - ExecCtx::Get()->Now(); |
|
|
|
|
if (timeout > 0) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active in %" PRId64 "ms.", |
|
|
|
|
this, timeout); |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active immediately.", this); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// TODO(roth): We currently track this ref manually. Once the
|
|
|
|
|
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
|
|
|
|
|
// with the callback.
|
|
|
|
|
auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); |
|
|
|
|
self.release(); |
|
|
|
|
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &XdsLb::OnBalancerCallRetryTimerLocked, |
|
|
|
|
this, grpc_combiner_scheduler(combiner())); |
|
|
|
|
retry_timer_callback_pending_ = true; |
|
|
|
|
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { |
|
|
|
|
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); |
|
|
|
|
xdslb_policy->retry_timer_callback_pending_ = false; |
|
|
|
|
if (!xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE && |
|
|
|
|
xdslb_policy->lb_calld_ == nullptr) { |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Restarting call to LB server", |
|
|
|
|
xdslb_policy); |
|
|
|
|
} |
|
|
|
|
xdslb_policy->StartBalancerCallLocked(); |
|
|
|
|
} |
|
|
|
|
xdslb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Invoked as part of the update process. It continues watching the LB channel
|
|
|
|
|
// until it shuts down or becomes READY. It's invoked even if the LB channel
|
|
|
|
|
// stayed READY throughout the update (for example if the update is identical).
|
|
|
|
|
void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); |
|
|
|
|
if (xdslb_policy->shutting_down_) goto done; |
|
|
|
|
// Re-initialize the lb_call. This should also take care of updating the
|
|
|
|
|
// child policy. Note that the current child policy, if any, will
|
|
|
|
|
// stay in effect until an update from the new lb_call is received.
|
|
|
|
|
switch (xdslb_policy->lb_channel_connectivity_) { |
|
|
|
|
case GRPC_CHANNEL_CONNECTING: |
|
|
|
|
case GRPC_CHANNEL_TRANSIENT_FAILURE: { |
|
|
|
|
// Keep watching the LB channel.
|
|
|
|
|
grpc_channel_element* client_channel_elem = |
|
|
|
|
grpc_channel_stack_last_element( |
|
|
|
|
grpc_channel_get_channel_stack(xdslb_policy->lb_channel_)); |
|
|
|
|
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); |
|
|
|
|
grpc_client_channel_watch_connectivity_state( |
|
|
|
|
client_channel_elem, |
|
|
|
|
grpc_polling_entity_create_from_pollset_set( |
|
|
|
|
xdslb_policy->interested_parties()), |
|
|
|
|
&xdslb_policy->lb_channel_connectivity_, |
|
|
|
|
&xdslb_policy->lb_channel_on_connectivity_changed_, nullptr); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
// The LB channel may be IDLE because it's shut down before the update.
|
|
|
|
|
// Restart the LB call to kick the LB channel into gear.
|
|
|
|
|
case GRPC_CHANNEL_IDLE: |
|
|
|
|
case GRPC_CHANNEL_READY: |
|
|
|
|
xdslb_policy->lb_calld_.reset(); |
|
|
|
|
if (xdslb_policy->retry_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&xdslb_policy->lb_call_retry_timer_); |
|
|
|
|
} |
|
|
|
|
xdslb_policy->lb_call_backoff_.Reset(); |
|
|
|
|
xdslb_policy->StartBalancerCallLocked(); |
|
|
|
|
// Fall through.
|
|
|
|
|
case GRPC_CHANNEL_SHUTDOWN: |
|
|
|
|
done: |
|
|
|
|
xdslb_policy->watching_lb_channel_ = false; |
|
|
|
|
xdslb_policy->Unref(DEBUG_LOCATION, |
|
|
|
|
"watch_lb_channel_connectivity_cb_shutdown"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// code for interacting with the child policy
|
|
|
|
|
//
|
|
|
|
@ -1360,18 +1366,6 @@ class XdsFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
public: |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
|
|
|
|
LoadBalancingPolicy::Args args) const override { |
|
|
|
|
/* Count the number of gRPC-LB addresses. There must be at least one. */ |
|
|
|
|
const ServerAddressList* addresses = |
|
|
|
|
FindServerAddressListChannelArg(args.args); |
|
|
|
|
if (addresses == nullptr) return nullptr; |
|
|
|
|
bool found_balancer_address = false; |
|
|
|
|
for (size_t i = 0; i < addresses->size(); ++i) { |
|
|
|
|
if ((*addresses)[i].IsBalancer()) { |
|
|
|
|
found_balancer_address = true; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!found_balancer_address) return nullptr; |
|
|
|
|
return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(std::move(args))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|