|
|
|
@ -68,222 +68,181 @@ namespace grpc_core { |
|
|
|
|
|
|
|
|
|
TraceFlag grpc_xds_client_trace(false, "xds_client"); |
|
|
|
|
|
|
|
|
|
// Contains a channel to the xds server and all the data related to the
|
|
|
|
|
// channel. Holds a ref to the xds client object.
|
|
|
|
|
// TODO(roth): This is separate from the XdsClient object because it was
|
|
|
|
|
// originally designed to be able to swap itself out in case the
|
|
|
|
|
// balancer name changed. Now that the balancer name is going to be
|
|
|
|
|
// coming from the bootstrap file, we don't really need this level of
|
|
|
|
|
// indirection unless we decide to support watching the bootstrap file
|
|
|
|
|
// for changes. At some point, if we decide that we're never going to
|
|
|
|
|
// need to do that, then we can eliminate this class and move its
|
|
|
|
|
// contents directly into the XdsClient class.
|
|
|
|
|
class XdsClient::ChannelState : public InternallyRefCounted<ChannelState> { |
|
|
|
|
//
|
|
|
|
|
// Internal class declarations
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
// An xds call wrapper that can restart a call upon failure. Holds a ref to
|
|
|
|
|
// the xds channel. The template parameter is the kind of wrapped xds call.
|
|
|
|
|
template <typename T> |
|
|
|
|
class XdsClient::ChannelState::RetryableCall |
|
|
|
|
: public InternallyRefCounted<RetryableCall<T>> { |
|
|
|
|
public: |
|
|
|
|
// An xds call wrapper that can restart a call upon failure. Holds a ref to
|
|
|
|
|
// the xds channel. The template parameter is the kind of wrapped xds call.
|
|
|
|
|
template <typename T> |
|
|
|
|
class RetryableCall : public InternallyRefCounted<RetryableCall<T>> { |
|
|
|
|
public: |
|
|
|
|
explicit RetryableCall(RefCountedPtr<ChannelState> chand); |
|
|
|
|
explicit RetryableCall(RefCountedPtr<ChannelState> chand); |
|
|
|
|
|
|
|
|
|
void Orphan() override; |
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
void OnCallFinishedLocked(); |
|
|
|
|
void OnCallFinishedLocked(); |
|
|
|
|
|
|
|
|
|
T* calld() const { return calld_.get(); } |
|
|
|
|
ChannelState* chand() const { return chand_.get(); } |
|
|
|
|
T* calld() const { return calld_.get(); } |
|
|
|
|
ChannelState* chand() const { return chand_.get(); } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void StartNewCallLocked(); |
|
|
|
|
void StartRetryTimerLocked(); |
|
|
|
|
static void OnRetryTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
// The wrapped call that talks to the xds server. It's instantiated
|
|
|
|
|
// every time we start a new call. It's null during call retry backoff.
|
|
|
|
|
OrphanablePtr<T> calld_; |
|
|
|
|
// The owning xds channel.
|
|
|
|
|
RefCountedPtr<ChannelState> chand_; |
|
|
|
|
|
|
|
|
|
// Retry state.
|
|
|
|
|
BackOff backoff_; |
|
|
|
|
grpc_timer retry_timer_; |
|
|
|
|
grpc_closure on_retry_timer_; |
|
|
|
|
bool retry_timer_callback_pending_ = false; |
|
|
|
|
|
|
|
|
|
bool shutting_down_ = false; |
|
|
|
|
}; |
|
|
|
|
bool IsCurrentCallOnChannel() const; |
|
|
|
|
|
|
|
|
|
// Contains an ADS call to the xds server.
|
|
|
|
|
class AdsCallState : public InternallyRefCounted<AdsCallState> { |
|
|
|
|
public: |
|
|
|
|
// The ctor and dtor should not be used directly.
|
|
|
|
|
explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent); |
|
|
|
|
~AdsCallState() override; |
|
|
|
|
private: |
|
|
|
|
void StartNewCallLocked(); |
|
|
|
|
void StartRetryTimerLocked(); |
|
|
|
|
static void OnRetryTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
// The wrapped xds call that talks to the xds server. It's instantiated
|
|
|
|
|
// every time we start a new call. It's null during call retry backoff.
|
|
|
|
|
OrphanablePtr<T> calld_; |
|
|
|
|
// The owning xds channel.
|
|
|
|
|
RefCountedPtr<ChannelState> chand_; |
|
|
|
|
|
|
|
|
|
// Retry state.
|
|
|
|
|
BackOff backoff_; |
|
|
|
|
grpc_timer retry_timer_; |
|
|
|
|
grpc_closure on_retry_timer_; |
|
|
|
|
bool retry_timer_callback_pending_ = false; |
|
|
|
|
|
|
|
|
|
void Orphan() override; |
|
|
|
|
bool shutting_down_ = false; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
RetryableCall<AdsCallState>* parent() const { return parent_.get(); } |
|
|
|
|
ChannelState* chand() const { return parent_->chand(); } |
|
|
|
|
XdsClient* xds_client() const { return chand()->xds_client(); } |
|
|
|
|
bool seen_response() const { return seen_response_; } |
|
|
|
|
// Contains an ADS call to the xds server.
|
|
|
|
|
class XdsClient::ChannelState::AdsCallState |
|
|
|
|
: public InternallyRefCounted<AdsCallState> { |
|
|
|
|
public: |
|
|
|
|
// The ctor and dtor should not be used directly.
|
|
|
|
|
explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent); |
|
|
|
|
~AdsCallState() override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
static void OnResponseReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnStatusReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
bool IsCurrentCallOnChannel() const; |
|
|
|
|
RetryableCall<AdsCallState>* parent() const { return parent_.get(); } |
|
|
|
|
ChannelState* chand() const { return parent_->chand(); } |
|
|
|
|
XdsClient* xds_client() const { return chand()->xds_client(); } |
|
|
|
|
bool seen_response() const { return seen_response_; } |
|
|
|
|
|
|
|
|
|
// The owning RetryableCall<>.
|
|
|
|
|
RefCountedPtr<RetryableCall<AdsCallState>> parent_; |
|
|
|
|
bool seen_response_ = false; |
|
|
|
|
private: |
|
|
|
|
static void OnResponseReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnStatusReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
// Always non-NULL.
|
|
|
|
|
grpc_call* call_; |
|
|
|
|
bool IsCurrentCallOnChannel() const; |
|
|
|
|
|
|
|
|
|
// recv_initial_metadata
|
|
|
|
|
grpc_metadata_array initial_metadata_recv_; |
|
|
|
|
// The owning RetryableCall<>.
|
|
|
|
|
RefCountedPtr<RetryableCall<AdsCallState>> parent_; |
|
|
|
|
bool seen_response_ = false; |
|
|
|
|
|
|
|
|
|
// send_message
|
|
|
|
|
grpc_byte_buffer* send_message_payload_ = nullptr; |
|
|
|
|
// Always non-NULL.
|
|
|
|
|
grpc_call* call_; |
|
|
|
|
|
|
|
|
|
// recv_message
|
|
|
|
|
grpc_byte_buffer* recv_message_payload_ = nullptr; |
|
|
|
|
grpc_closure on_response_received_; |
|
|
|
|
// recv_initial_metadata
|
|
|
|
|
grpc_metadata_array initial_metadata_recv_; |
|
|
|
|
|
|
|
|
|
// recv_trailing_metadata
|
|
|
|
|
grpc_metadata_array trailing_metadata_recv_; |
|
|
|
|
grpc_status_code status_code_; |
|
|
|
|
grpc_slice status_details_; |
|
|
|
|
grpc_closure on_status_received_; |
|
|
|
|
}; |
|
|
|
|
// send_message
|
|
|
|
|
grpc_byte_buffer* send_message_payload_ = nullptr; |
|
|
|
|
|
|
|
|
|
// Contains an LRS call to the xds server.
|
|
|
|
|
class LrsCallState : public InternallyRefCounted<LrsCallState> { |
|
|
|
|
public: |
|
|
|
|
// The ctor and dtor should not be used directly.
|
|
|
|
|
explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent); |
|
|
|
|
~LrsCallState() override; |
|
|
|
|
// recv_message
|
|
|
|
|
grpc_byte_buffer* recv_message_payload_ = nullptr; |
|
|
|
|
grpc_closure on_response_received_; |
|
|
|
|
|
|
|
|
|
void Orphan() override; |
|
|
|
|
// recv_trailing_metadata
|
|
|
|
|
grpc_metadata_array trailing_metadata_recv_; |
|
|
|
|
grpc_status_code status_code_; |
|
|
|
|
grpc_slice status_details_; |
|
|
|
|
grpc_closure on_status_received_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
void MaybeStartReportingLocked(); |
|
|
|
|
// Contains an LRS call to the xds server.
|
|
|
|
|
class XdsClient::ChannelState::LrsCallState |
|
|
|
|
: public InternallyRefCounted<LrsCallState> { |
|
|
|
|
public: |
|
|
|
|
// The ctor and dtor should not be used directly.
|
|
|
|
|
explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent); |
|
|
|
|
~LrsCallState() override; |
|
|
|
|
|
|
|
|
|
RetryableCall<LrsCallState>* parent() { return parent_.get(); } |
|
|
|
|
ChannelState* chand() const { return parent_->chand(); } |
|
|
|
|
XdsClient* xds_client() const { return chand()->xds_client(); } |
|
|
|
|
bool seen_response() const { return seen_response_; } |
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
// Reports client-side load stats according to a fixed interval.
|
|
|
|
|
class Reporter : public InternallyRefCounted<Reporter> { |
|
|
|
|
public: |
|
|
|
|
Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval) |
|
|
|
|
: parent_(std::move(parent)), report_interval_(report_interval) { |
|
|
|
|
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimerLocked, this, |
|
|
|
|
grpc_combiner_scheduler(xds_client()->combiner_)); |
|
|
|
|
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDoneLocked, this, |
|
|
|
|
grpc_combiner_scheduler(xds_client()->combiner_)); |
|
|
|
|
ScheduleNextReportLocked(); |
|
|
|
|
} |
|
|
|
|
void MaybeStartReportingLocked(); |
|
|
|
|
|
|
|
|
|
void Orphan() override; |
|
|
|
|
RetryableCall<LrsCallState>* parent() { return parent_.get(); } |
|
|
|
|
ChannelState* chand() const { return parent_->chand(); } |
|
|
|
|
XdsClient* xds_client() const { return chand()->xds_client(); } |
|
|
|
|
bool seen_response() const { return seen_response_; } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void ScheduleNextReportLocked(); |
|
|
|
|
static void OnNextReportTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
void SendReportLocked(); |
|
|
|
|
static void OnReportDoneLocked(void* arg, grpc_error* error); |
|
|
|
|
private: |
|
|
|
|
// Reports client-side load stats according to a fixed interval.
|
|
|
|
|
class Reporter : public InternallyRefCounted<Reporter> { |
|
|
|
|
public: |
|
|
|
|
Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval) |
|
|
|
|
: parent_(std::move(parent)), report_interval_(report_interval) { |
|
|
|
|
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimerLocked, this, |
|
|
|
|
grpc_combiner_scheduler(xds_client()->combiner_)); |
|
|
|
|
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDoneLocked, this, |
|
|
|
|
grpc_combiner_scheduler(xds_client()->combiner_)); |
|
|
|
|
ScheduleNextReportLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool IsCurrentReporterOnCall() const { |
|
|
|
|
return this == parent_->reporter_.get(); |
|
|
|
|
} |
|
|
|
|
XdsClient* xds_client() const { return parent_->xds_client(); } |
|
|
|
|
|
|
|
|
|
// The owning LRS call.
|
|
|
|
|
RefCountedPtr<LrsCallState> parent_; |
|
|
|
|
|
|
|
|
|
// The load reporting state.
|
|
|
|
|
const grpc_millis report_interval_; |
|
|
|
|
bool last_report_counters_were_zero_ = false; |
|
|
|
|
bool next_report_timer_callback_pending_ = false; |
|
|
|
|
grpc_timer next_report_timer_; |
|
|
|
|
grpc_closure on_next_report_timer_; |
|
|
|
|
grpc_closure on_report_done_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
static void OnInitialRequestSentLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnResponseReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnStatusReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
bool IsCurrentCallOnChannel() const; |
|
|
|
|
|
|
|
|
|
// The owning RetryableCall<>.
|
|
|
|
|
RefCountedPtr<RetryableCall<LrsCallState>> parent_; |
|
|
|
|
bool seen_response_ = false; |
|
|
|
|
|
|
|
|
|
// Always non-NULL.
|
|
|
|
|
grpc_call* call_; |
|
|
|
|
|
|
|
|
|
// recv_initial_metadata
|
|
|
|
|
grpc_metadata_array initial_metadata_recv_; |
|
|
|
|
|
|
|
|
|
// send_message
|
|
|
|
|
grpc_byte_buffer* send_message_payload_ = nullptr; |
|
|
|
|
grpc_closure on_initial_request_sent_; |
|
|
|
|
|
|
|
|
|
// recv_message
|
|
|
|
|
grpc_byte_buffer* recv_message_payload_ = nullptr; |
|
|
|
|
grpc_closure on_response_received_; |
|
|
|
|
|
|
|
|
|
// recv_trailing_metadata
|
|
|
|
|
grpc_metadata_array trailing_metadata_recv_; |
|
|
|
|
grpc_status_code status_code_; |
|
|
|
|
grpc_slice status_details_; |
|
|
|
|
grpc_closure on_status_received_; |
|
|
|
|
|
|
|
|
|
// Load reporting state.
|
|
|
|
|
UniquePtr<char> cluster_name_; |
|
|
|
|
grpc_millis load_reporting_interval_ = 0; |
|
|
|
|
OrphanablePtr<Reporter> reporter_; |
|
|
|
|
}; |
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
ChannelState(RefCountedPtr<XdsClient> xds_client, const char* balancer_name, |
|
|
|
|
const grpc_channel_args& args); |
|
|
|
|
~ChannelState(); |
|
|
|
|
private: |
|
|
|
|
void ScheduleNextReportLocked(); |
|
|
|
|
static void OnNextReportTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
void SendReportLocked(); |
|
|
|
|
static void OnReportDoneLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
void Orphan() override; |
|
|
|
|
bool IsCurrentReporterOnCall() const { |
|
|
|
|
return this == parent_->reporter_.get(); |
|
|
|
|
} |
|
|
|
|
XdsClient* xds_client() const { return parent_->xds_client(); } |
|
|
|
|
|
|
|
|
|
// The owning LRS call.
|
|
|
|
|
RefCountedPtr<LrsCallState> parent_; |
|
|
|
|
|
|
|
|
|
// The load reporting state.
|
|
|
|
|
const grpc_millis report_interval_; |
|
|
|
|
bool last_report_counters_were_zero_ = false; |
|
|
|
|
bool next_report_timer_callback_pending_ = false; |
|
|
|
|
grpc_timer next_report_timer_; |
|
|
|
|
grpc_closure on_next_report_timer_; |
|
|
|
|
grpc_closure on_report_done_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
grpc_channel* channel() const { return channel_; } |
|
|
|
|
XdsClient* xds_client() const { return xds_client_.get(); } |
|
|
|
|
AdsCallState* ads_calld() const { return ads_calld_->calld(); } |
|
|
|
|
LrsCallState* lrs_calld() const { return lrs_calld_->calld(); } |
|
|
|
|
static void OnInitialRequestSentLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnResponseReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnStatusReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
void MaybeStartAdsCall(); |
|
|
|
|
void StopAdsCall(); |
|
|
|
|
bool IsCurrentCallOnChannel() const; |
|
|
|
|
|
|
|
|
|
void MaybeStartLrsCall(); |
|
|
|
|
void StopLrsCall(); |
|
|
|
|
// The owning RetryableCall<>.
|
|
|
|
|
RefCountedPtr<RetryableCall<LrsCallState>> parent_; |
|
|
|
|
bool seen_response_ = false; |
|
|
|
|
|
|
|
|
|
bool HasActiveAdsCall() const { return ads_calld_->calld() != nullptr; } |
|
|
|
|
// Always non-NULL.
|
|
|
|
|
grpc_call* call_; |
|
|
|
|
|
|
|
|
|
void StartConnectivityWatchLocked(); |
|
|
|
|
void CancelConnectivityWatchLocked(); |
|
|
|
|
// recv_initial_metadata
|
|
|
|
|
grpc_metadata_array initial_metadata_recv_; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
class StateWatcher; |
|
|
|
|
// send_message
|
|
|
|
|
grpc_byte_buffer* send_message_payload_ = nullptr; |
|
|
|
|
grpc_closure on_initial_request_sent_; |
|
|
|
|
|
|
|
|
|
// The owning xds client.
|
|
|
|
|
RefCountedPtr<XdsClient> xds_client_; |
|
|
|
|
// recv_message
|
|
|
|
|
grpc_byte_buffer* recv_message_payload_ = nullptr; |
|
|
|
|
grpc_closure on_response_received_; |
|
|
|
|
|
|
|
|
|
// The channel and its status.
|
|
|
|
|
grpc_channel* channel_; |
|
|
|
|
bool shutting_down_ = false; |
|
|
|
|
StateWatcher* watcher_ = nullptr; |
|
|
|
|
// recv_trailing_metadata
|
|
|
|
|
grpc_metadata_array trailing_metadata_recv_; |
|
|
|
|
grpc_status_code status_code_; |
|
|
|
|
grpc_slice status_details_; |
|
|
|
|
grpc_closure on_status_received_; |
|
|
|
|
|
|
|
|
|
// The retryable XDS calls.
|
|
|
|
|
OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_; |
|
|
|
|
OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_; |
|
|
|
|
// Load reporting state.
|
|
|
|
|
UniquePtr<char> cluster_name_; |
|
|
|
|
grpc_millis load_reporting_interval_ = 0; |
|
|
|
|
OrphanablePtr<Reporter> reporter_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -374,12 +333,11 @@ grpc_channel_args* BuildXdsChannelArgs(const grpc_channel_args& args) { |
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
XdsClient::ChannelState::ChannelState(RefCountedPtr<XdsClient> xds_client, |
|
|
|
|
const char* balancer_name, |
|
|
|
|
const grpc_channel_args& args) |
|
|
|
|
: InternallyRefCounted<ChannelState>(&grpc_xds_client_trace), |
|
|
|
|
xds_client_(std::move(xds_client)) { |
|
|
|
|
grpc_channel_args* new_args = BuildXdsChannelArgs(args); |
|
|
|
|
channel_ = CreateXdsChannel(balancer_name, *new_args); |
|
|
|
|
channel_ = CreateXdsChannel(*xds_client_->bootstrap_, *new_args); |
|
|
|
|
grpc_channel_args_destroy(new_args); |
|
|
|
|
GPR_ASSERT(channel_ != nullptr); |
|
|
|
|
StartConnectivityWatchLocked(); |
|
|
|
@ -401,6 +359,20 @@ void XdsClient::ChannelState::Orphan() { |
|
|
|
|
Unref(DEBUG_LOCATION, "ChannelState+orphaned"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld() |
|
|
|
|
const { |
|
|
|
|
return ads_calld_->calld(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld() |
|
|
|
|
const { |
|
|
|
|
return lrs_calld_->calld(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool XdsClient::ChannelState::HasActiveAdsCall() const { |
|
|
|
|
return ads_calld_->calld() != nullptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::MaybeStartAdsCall() { |
|
|
|
|
if (ads_calld_ != nullptr) return; |
|
|
|
|
ads_calld_.reset(New<RetryableCall<AdsCallState>>( |
|
|
|
@ -547,8 +519,9 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( |
|
|
|
|
nullptr, GRPC_MILLIS_INF_FUTURE, nullptr); |
|
|
|
|
GPR_ASSERT(call_ != nullptr); |
|
|
|
|
// Init the request payload.
|
|
|
|
|
grpc_slice request_payload_slice = |
|
|
|
|
XdsEdsRequestCreateAndEncode(xds_client()->server_name_.get()); |
|
|
|
|
grpc_slice request_payload_slice = XdsEdsRequestCreateAndEncode( |
|
|
|
|
xds_client()->server_name_.get(), xds_client()->bootstrap_->node(), |
|
|
|
|
xds_client()->build_version_.get()); |
|
|
|
|
send_message_payload_ = |
|
|
|
|
grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
|
|
|
|
grpc_slice_unref_internal(request_payload_slice); |
|
|
|
@ -923,8 +896,9 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( |
|
|
|
|
nullptr, GRPC_MILLIS_INF_FUTURE, nullptr); |
|
|
|
|
GPR_ASSERT(call_ != nullptr); |
|
|
|
|
// Init the request payload.
|
|
|
|
|
grpc_slice request_payload_slice = |
|
|
|
|
XdsLrsRequestCreateAndEncode(xds_client()->server_name_.get()); |
|
|
|
|
grpc_slice request_payload_slice = XdsLrsRequestCreateAndEncode( |
|
|
|
|
xds_client()->server_name_.get(), xds_client()->bootstrap_->node(), |
|
|
|
|
xds_client()->build_version_.get()); |
|
|
|
|
send_message_payload_ = |
|
|
|
|
grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
|
|
|
|
grpc_slice_unref_internal(request_payload_slice); |
|
|
|
@ -1177,19 +1151,48 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const { |
|
|
|
|
// XdsClient
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
UniquePtr<char> GenerateBuildVersionString() { |
|
|
|
|
char* build_version_str; |
|
|
|
|
gpr_asprintf(&build_version_str, "gRPC C-core %s %s", grpc_version_string(), |
|
|
|
|
GPR_PLATFORM_STRING); |
|
|
|
|
return UniquePtr<char>(build_version_str); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
XdsClient::XdsClient(grpc_combiner* combiner, |
|
|
|
|
grpc_pollset_set* interested_parties, |
|
|
|
|
const char* balancer_name, StringView server_name, |
|
|
|
|
StringView server_name, |
|
|
|
|
UniquePtr<ServiceConfigWatcherInterface> watcher, |
|
|
|
|
const grpc_channel_args& channel_args) |
|
|
|
|
: combiner_(GRPC_COMBINER_REF(combiner, "xds_client")), |
|
|
|
|
const grpc_channel_args& channel_args, grpc_error** error) |
|
|
|
|
: build_version_(GenerateBuildVersionString()), |
|
|
|
|
combiner_(GRPC_COMBINER_REF(combiner, "xds_client")), |
|
|
|
|
interested_parties_(interested_parties), |
|
|
|
|
bootstrap_(XdsBootstrap::ReadFromFile(error)), |
|
|
|
|
server_name_(server_name.dup()), |
|
|
|
|
service_config_watcher_(std::move(watcher)), |
|
|
|
|
chand_(MakeOrphanable<ChannelState>( |
|
|
|
|
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), balancer_name, |
|
|
|
|
channel_args)) { |
|
|
|
|
// TODO(roth): Start LDS call.
|
|
|
|
|
service_config_watcher_(std::move(watcher)) { |
|
|
|
|
if (*error != GRPC_ERROR_NONE) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_client %p: failed to read bootstrap file: %s", |
|
|
|
|
this, grpc_error_string(*error)); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_client %p: creating channel to %s", this, |
|
|
|
|
bootstrap_->server_uri()); |
|
|
|
|
} |
|
|
|
|
chand_ = MakeOrphanable<ChannelState>( |
|
|
|
|
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel_args); |
|
|
|
|
if (service_config_watcher_ != nullptr) { |
|
|
|
|
// TODO(juanlishen): Start LDS call and do not return service config
|
|
|
|
|
// until we get the first LDS response.
|
|
|
|
|
GRPC_CLOSURE_INIT(&service_config_notify_, NotifyOnServiceConfig, |
|
|
|
|
Ref().release(), grpc_combiner_scheduler(combiner_)); |
|
|
|
|
GRPC_CLOSURE_SCHED(&service_config_notify_, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
XdsClient::~XdsClient() { GRPC_COMBINER_UNREF(combiner_, "xds_client"); } |
|
|
|
@ -1202,12 +1205,12 @@ void XdsClient::Orphan() { |
|
|
|
|
|
|
|
|
|
void XdsClient::WatchClusterData(StringView cluster, |
|
|
|
|
UniquePtr<ClusterWatcherInterface> watcher) { |
|
|
|
|
// TODO(roth): Implement.
|
|
|
|
|
// TODO(juanlishen): Implement.
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::CancelClusterDataWatch(StringView cluster, |
|
|
|
|
ClusterWatcherInterface* watcher) { |
|
|
|
|
// TODO(roth): Implement.
|
|
|
|
|
// TODO(juanlishen): Implement.
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::WatchEndpointData(StringView cluster, |
|
|
|
@ -1228,7 +1231,9 @@ void XdsClient::CancelEndpointDataWatch(StringView cluster, |
|
|
|
|
if (it != cluster_state_.endpoint_watchers.end()) { |
|
|
|
|
cluster_state_.endpoint_watchers.erase(it); |
|
|
|
|
} |
|
|
|
|
if (cluster_state_.endpoint_watchers.empty()) chand_->StopAdsCall(); |
|
|
|
|
if (chand_ != nullptr && cluster_state_.endpoint_watchers.empty()) { |
|
|
|
|
chand_->StopAdsCall(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::AddClientStats(StringView cluster, |
|
|
|
@ -1246,7 +1251,9 @@ void XdsClient::RemoveClientStats(StringView cluster, |
|
|
|
|
if (it != cluster_state_.client_stats.end()) { |
|
|
|
|
cluster_state_.client_stats.erase(it); |
|
|
|
|
} |
|
|
|
|
if (cluster_state_.client_stats.empty()) chand_->StopLrsCall(); |
|
|
|
|
if (chand_ != nullptr && cluster_state_.client_stats.empty()) { |
|
|
|
|
chand_->StopLrsCall(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ResetBackoff() { |
|
|
|
@ -1256,9 +1263,6 @@ void XdsClient::ResetBackoff() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::NotifyOnError(grpc_error* error) { |
|
|
|
|
// TODO(roth): Once we implement the full LDS flow, it will not be
|
|
|
|
|
// necessary to check for the service config watcher being non-null,
|
|
|
|
|
// because that will always be true.
|
|
|
|
|
if (service_config_watcher_ != nullptr) { |
|
|
|
|
service_config_watcher_->OnError(GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
@ -1271,6 +1275,27 @@ void XdsClient::NotifyOnError(grpc_error* error) { |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::NotifyOnServiceConfig(void* arg, grpc_error* error) { |
|
|
|
|
XdsClient* self = static_cast<XdsClient*>(arg); |
|
|
|
|
// TODO(roth): When we add support for WeightedClusters, select the
|
|
|
|
|
// LB policy based on that functionality.
|
|
|
|
|
static const char* json = |
|
|
|
|
"{\n" |
|
|
|
|
" \"loadBalancingConfig\":[\n" |
|
|
|
|
" { \"xds_experimental\":{} }\n" |
|
|
|
|
" ]\n" |
|
|
|
|
"}"; |
|
|
|
|
RefCountedPtr<ServiceConfig> service_config = |
|
|
|
|
ServiceConfig::Create(json, &error); |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
self->service_config_watcher_->OnError(error); |
|
|
|
|
} else { |
|
|
|
|
self->service_config_watcher_->OnServiceConfigChanged( |
|
|
|
|
std::move(service_config)); |
|
|
|
|
} |
|
|
|
|
self->Unref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void* XdsClient::ChannelArgCopy(void* p) { |
|
|
|
|
XdsClient* xds_client = static_cast<XdsClient*>(p); |
|
|
|
|
xds_client->Ref().release(); |
|
|
|
|