From 9cf228b8a4f126e84eb62fc271d1b683703a61e5 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 11 Jan 2024 17:47:26 -0800 Subject: [PATCH] [XdsClient] rename a bunch of internal classes, variables, and methods for clarity (#35531) Closes #35531 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35531 from markdroth:xds_client_cleanup 1b197cb90a0f8d21c9facccc583347f527898ee3 PiperOrigin-RevId: 597698967 --- src/core/ext/xds/xds_client.cc | 602 ++++++++++++++++----------------- src/core/ext/xds/xds_client.h | 32 +- 2 files changed, 315 insertions(+), 319 deletions(-) diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 7b125b771d2..f96bc95e5e3 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -70,10 +70,10 @@ TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount"); // An xds call wrapper that can restart a call upon failure. Holds a ref to // the xds channel. The template parameter is the kind of wrapped xds call. template -class XdsClient::ChannelState::RetryableCall +class XdsClient::XdsChannel::RetryableCall : public InternallyRefCounted> { public: - explicit RetryableCall(WeakRefCountedPtr chand); + explicit RetryableCall(WeakRefCountedPtr xds_channel); // Disable thread-safety analysis because this method is called via // OrphanablePtr<>, but there's no way to pass the lock annotation @@ -82,8 +82,8 @@ class XdsClient::ChannelState::RetryableCall void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - T* calld() const { return calld_.get(); } - ChannelState* chand() const { return chand_.get(); } + T* call() const { return call_.get(); } + XdsChannel* xds_channel() const { return xds_channel_.get(); } bool IsCurrentCallOnChannel() const; @@ -95,9 +95,9 @@ class XdsClient::ChannelState::RetryableCall // The wrapped xds call that talks to the xds server. It's instantiated // every time we start a new call. It's null during call retry backoff. - OrphanablePtr calld_; + OrphanablePtr call_; // The owning xds channel. - WeakRefCountedPtr chand_; + WeakRefCountedPtr xds_channel_; // Retry state. BackOff backoff_; @@ -108,17 +108,18 @@ class XdsClient::ChannelState::RetryableCall }; // Contains an ADS call to the xds server. -class XdsClient::ChannelState::AdsCallState - : public InternallyRefCounted { +class XdsClient::XdsChannel::AdsCall : public InternallyRefCounted { public: // The ctor and dtor should not be used directly. - explicit AdsCallState(RefCountedPtr> parent); + explicit AdsCall(RefCountedPtr> retryable_call); void Orphan() override; - RetryableCall* parent() const { return parent_.get(); } - ChannelState* chand() const { return parent_->chand(); } - XdsClient* xds_client() const { return chand()->xds_client(); } + RetryableCall* retryable_call() const { + return retryable_call_.get(); + } + XdsChannel* xds_channel() const { return retryable_call_->xds_channel(); } + XdsClient* xds_client() const { return xds_channel()->xds_client(); } bool seen_response() const { return seen_response_; } void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name, @@ -131,15 +132,7 @@ class XdsClient::ChannelState::AdsCallState bool HasSubscribedResources() const; private: - class AdsReadDelayHandle : public ReadDelayHandle { - public: - explicit AdsReadDelayHandle(RefCountedPtr ads_call_state) - : ads_call_state_(std::move(ads_call_state)) {} - ~AdsReadDelayHandle() override; - - private: - RefCountedPtr ads_call_state_; - }; + class AdsReadDelayHandle; class AdsResponseParser : public XdsApi::AdsResponseParserInterface { public: @@ -155,8 +148,7 @@ class XdsClient::ChannelState::AdsCallState RefCountedPtr read_delay_handle; }; - explicit AdsResponseParser(AdsCallState* ads_call_state) - : ads_call_state_(ads_call_state) {} + explicit AdsResponseParser(AdsCall* ads_call) : ads_call_(ads_call) {} absl::Status ProcessAdsResponseFields(AdsResponseFields fields) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); @@ -172,9 +164,9 @@ class XdsClient::ChannelState::AdsCallState Result TakeResult() { return std::move(result_); } private: - XdsClient* xds_client() const { return ads_call_state_->xds_client(); } + XdsClient* xds_client() const { return ads_call_->xds_client(); } - AdsCallState* ads_call_state_; + AdsCall* ads_call_; const Timestamp update_time_ = Timestamp::Now(); Result result_; }; @@ -197,10 +189,9 @@ class XdsClient::ChannelState::AdsCallState subscription_sent_ = true; } - void MaybeMarkSubscriptionSendComplete( - RefCountedPtr ads_calld) + void MaybeMarkSubscriptionSendComplete(RefCountedPtr ads_call) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { - if (subscription_sent_) MaybeStartTimer(std::move(ads_calld)); + if (subscription_sent_) MaybeStartTimer(std::move(ads_call)); } void MarkSeen() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { @@ -210,13 +201,13 @@ class XdsClient::ChannelState::AdsCallState void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { if (timer_handle_.has_value() && - ads_calld_->xds_client()->engine()->Cancel(*timer_handle_)) { + ads_call_->xds_client()->engine()->Cancel(*timer_handle_)) { timer_handle_.reset(); } } private: - void MaybeStartTimer(RefCountedPtr ads_calld) + void MaybeStartTimer(RefCountedPtr ads_call) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { // Don't start timer if we've already either seen the resource or // marked it as non-existing. @@ -237,13 +228,13 @@ class XdsClient::ChannelState::AdsCallState // (a) we already have the resource and (b) the server may // optimize by not resending the resource that we already have. auto& authority_state = - ads_calld->xds_client()->authority_state_map_[name_.authority]; + ads_call->xds_client()->authority_state_map_[name_.authority]; ResourceState& state = authority_state.resource_map[type_][name_.key]; if (state.resource != nullptr) return; // Start timer. - ads_calld_ = std::move(ads_calld); - timer_handle_ = ads_calld_->xds_client()->engine()->RunAfter( - ads_calld_->xds_client()->request_timeout_, + ads_call_ = std::move(ads_call); + timer_handle_ = ads_call_->xds_client()->engine()->RunAfter( + ads_call_->xds_client()->request_timeout_, [self = Ref(DEBUG_LOCATION, "timer")]() { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; @@ -256,32 +247,32 @@ class XdsClient::ChannelState::AdsCallState gpr_log(GPR_INFO, "[xds_client %p] xds server %s: timeout obtaining resource " "{type=%s name=%s} from xds server", - ads_calld_->xds_client(), - ads_calld_->chand()->server_.server_uri().c_str(), + ads_call_->xds_client(), + ads_call_->xds_channel()->server_.server_uri().c_str(), std::string(type_->type_url()).c_str(), XdsClient::ConstructFullXdsResourceName( name_.authority, type_->type_url(), name_.key) .c_str()); } { - MutexLock lock(&ads_calld_->xds_client()->mu_); + MutexLock lock(&ads_call_->xds_client()->mu_); timer_handle_.reset(); resource_seen_ = true; auto& authority_state = - ads_calld_->xds_client()->authority_state_map_[name_.authority]; + ads_call_->xds_client()->authority_state_map_[name_.authority]; ResourceState& state = authority_state.resource_map[type_][name_.key]; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist( + ads_call_->xds_client()->NotifyWatchersOnResourceDoesNotExist( state.watchers, ReadDelayHandle::NoWait()); } - ads_calld_->xds_client()->work_serializer_.DrainQueue(); - ads_calld_.reset(); + ads_call_->xds_client()->work_serializer_.DrainQueue(); + ads_call_.reset(); } const XdsResourceType* type_; const XdsResourceName name_; - RefCountedPtr ads_calld_; + RefCountedPtr ads_call_; // True if we have sent the initial subscription request for this // resource on this ADS stream. bool subscription_sent_ ABSL_GUARDED_BY(&XdsClient::mu_) = false; @@ -296,19 +287,19 @@ class XdsClient::ChannelState::AdsCallState class StreamEventHandler : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler { public: - explicit StreamEventHandler(RefCountedPtr ads_calld) - : ads_calld_(std::move(ads_calld)) {} + explicit StreamEventHandler(RefCountedPtr ads_call) + : ads_call_(std::move(ads_call)) {} - void OnRequestSent(bool ok) override { ads_calld_->OnRequestSent(ok); } + void OnRequestSent(bool ok) override { ads_call_->OnRequestSent(ok); } void OnRecvMessage(absl::string_view payload) override { - ads_calld_->OnRecvMessage(payload); + ads_call_->OnRecvMessage(payload); } void OnStatusReceived(absl::Status status) override { - ads_calld_->OnStatusReceived(std::move(status)); + ads_call_->OnStatusReceived(std::move(status)); } private: - RefCountedPtr ads_calld_; + RefCountedPtr ads_call_; }; struct ResourceTypeState { @@ -337,9 +328,10 @@ class XdsClient::ChannelState::AdsCallState ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); // The owning RetryableCall<>. - RefCountedPtr> parent_; + RefCountedPtr> retryable_call_; - OrphanablePtr call_; + OrphanablePtr + streaming_call_; bool sent_initial_message_ = false; bool seen_response_ = false; @@ -355,46 +347,45 @@ class XdsClient::ChannelState::AdsCallState }; // Contains an LRS call to the xds server. -class XdsClient::ChannelState::LrsCallState - : public InternallyRefCounted { +class XdsClient::XdsChannel::LrsCall : public InternallyRefCounted { public: // The ctor and dtor should not be used directly. - explicit LrsCallState(RefCountedPtr> parent); + explicit LrsCall(RefCountedPtr> retryable_call); void Orphan() override; void MaybeStartReportingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - RetryableCall* parent() { return parent_.get(); } - ChannelState* chand() const { return parent_->chand(); } - XdsClient* xds_client() const { return chand()->xds_client(); } + RetryableCall* retryable_call() { return retryable_call_.get(); } + XdsChannel* xds_channel() const { return retryable_call_->xds_channel(); } + XdsClient* xds_client() const { return xds_channel()->xds_client(); } bool seen_response() const { return seen_response_; } private: class StreamEventHandler : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler { public: - explicit StreamEventHandler(RefCountedPtr lrs_calld) - : lrs_calld_(std::move(lrs_calld)) {} + explicit StreamEventHandler(RefCountedPtr lrs_call) + : lrs_call_(std::move(lrs_call)) {} - void OnRequestSent(bool ok) override { lrs_calld_->OnRequestSent(ok); } + void OnRequestSent(bool ok) override { lrs_call_->OnRequestSent(ok); } void OnRecvMessage(absl::string_view payload) override { - lrs_calld_->OnRecvMessage(payload); + lrs_call_->OnRecvMessage(payload); } void OnStatusReceived(absl::Status status) override { - lrs_calld_->OnStatusReceived(std::move(status)); + lrs_call_->OnStatusReceived(std::move(status)); } private: - RefCountedPtr lrs_calld_; + RefCountedPtr lrs_call_; }; // Reports client-side load stats according to a fixed interval. class Reporter : public InternallyRefCounted { public: - Reporter(RefCountedPtr parent, Duration report_interval) - : parent_(std::move(parent)), report_interval_(report_interval) { + Reporter(RefCountedPtr lrs_call, Duration report_interval) + : lrs_call_(std::move(lrs_call)), report_interval_(report_interval) { ScheduleNextReportLocked(); } @@ -412,12 +403,12 @@ class XdsClient::ChannelState::LrsCallState bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool IsCurrentReporterOnCall() const { - return this == parent_->reporter_.get(); + return this == lrs_call_->reporter_.get(); } - XdsClient* xds_client() const { return parent_->xds_client(); } + XdsClient* xds_client() const { return lrs_call_->xds_client(); } // The owning LRS call. - RefCountedPtr parent_; + RefCountedPtr lrs_call_; // The load reporting state. const Duration report_interval_; @@ -433,9 +424,10 @@ class XdsClient::ChannelState::LrsCallState bool IsCurrentCallOnChannel() const; // The owning RetryableCall<>. - RefCountedPtr> parent_; + RefCountedPtr> retryable_call_; - OrphanablePtr call_; + OrphanablePtr + streaming_call_; bool seen_response_ = false; bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false; @@ -448,15 +440,14 @@ class XdsClient::ChannelState::LrsCallState }; // -// XdsClient::ChannelState +// XdsClient::XdsChannel // -XdsClient::ChannelState::ChannelState(WeakRefCountedPtr xds_client, - const XdsBootstrap::XdsServer& server) - : DualRefCounted( - GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) - ? "ChannelState" - : nullptr), +XdsClient::XdsChannel::XdsChannel(WeakRefCountedPtr xds_client, + const XdsBootstrap::XdsServer& server) + : DualRefCounted( + GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsChannel" + : nullptr), xds_client_(std::move(xds_client)), server_(server) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -475,19 +466,19 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr xds_client, if (!status.ok()) SetChannelStatusLocked(std::move(status)); } -XdsClient::ChannelState::~ChannelState() { +XdsClient::XdsChannel::~XdsChannel() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s", xds_client(), this, server_.server_uri().c_str()); } - xds_client_.reset(DEBUG_LOCATION, "ChannelState"); + xds_client_.reset(DEBUG_LOCATION, "XdsChannel"); } // This method should only ever be called when holding the lock, but we can't // use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be // called from DualRefCounted::Unref, which cannot have a lock annotation for // a lock in this subclass. -void XdsClient::ChannelState::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS { +void XdsClient::XdsChannel::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] orphaning xds channel %p for server %s", xds_client(), this, server_.server_uri().c_str()); @@ -495,69 +486,67 @@ void XdsClient::ChannelState::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS { shutting_down_ = true; transport_.reset(); // At this time, all strong refs are removed, remove from channel map to - // prevent subsequent subscription from trying to use this ChannelState as + // prevent subsequent subscription from trying to use this XdsChannel as // it is shutting down. xds_client_->xds_server_channel_map_.erase(&server_); - ads_calld_.reset(); - lrs_calld_.reset(); + ads_call_.reset(); + lrs_call_.reset(); } -void XdsClient::ChannelState::ResetBackoff() { transport_->ResetBackoff(); } +void XdsClient::XdsChannel::ResetBackoff() { transport_->ResetBackoff(); } -XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld() - const { - return ads_calld_->calld(); +XdsClient::XdsChannel::AdsCall* XdsClient::XdsChannel::ads_call() const { + return ads_call_->call(); } -XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld() - const { - return lrs_calld_->calld(); +XdsClient::XdsChannel::LrsCall* XdsClient::XdsChannel::lrs_call() const { + return lrs_call_->call(); } -void XdsClient::ChannelState::MaybeStartLrsCall() { - if (lrs_calld_ != nullptr) return; - lrs_calld_.reset(new RetryableCall( - WeakRef(DEBUG_LOCATION, "ChannelState+lrs"))); +void XdsClient::XdsChannel::MaybeStartLrsCall() { + if (lrs_call_ != nullptr) return; + lrs_call_.reset( + new RetryableCall(WeakRef(DEBUG_LOCATION, "XdsChannel+lrs"))); } -void XdsClient::ChannelState::StopLrsCallLocked() { +void XdsClient::XdsChannel::StopLrsCallLocked() { xds_client_->xds_load_report_server_map_.erase(&server_); - lrs_calld_.reset(); + lrs_call_.reset(); } -void XdsClient::ChannelState::SubscribeLocked(const XdsResourceType* type, - const XdsResourceName& name) { - if (ads_calld_ == nullptr) { +void XdsClient::XdsChannel::SubscribeLocked(const XdsResourceType* type, + const XdsResourceName& name) { + if (ads_call_ == nullptr) { // Start the ADS call if this is the first request. - ads_calld_.reset(new RetryableCall( - WeakRef(DEBUG_LOCATION, "ChannelState+ads"))); - // Note: AdsCallState's ctor will automatically subscribe to all + ads_call_.reset( + new RetryableCall(WeakRef(DEBUG_LOCATION, "XdsChannel+ads"))); + // Note: AdsCall's ctor will automatically subscribe to all // resources that the XdsClient already has watchers for, so we can // return here. return; } // If the ADS call is in backoff state, we don't need to do anything now // because when the call is restarted it will resend all necessary requests. - if (ads_calld() == nullptr) return; + if (ads_call() == nullptr) return; // Subscribe to this resource if the ADS call is active. - ads_calld()->SubscribeLocked(type, name, /*delay_send=*/false); -} - -void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type, - const XdsResourceName& name, - bool delay_unsubscription) { - if (ads_calld_ != nullptr) { - auto* calld = ads_calld_->calld(); - if (calld != nullptr) { - calld->UnsubscribeLocked(type, name, delay_unsubscription); - if (!calld->HasSubscribedResources()) { - ads_calld_.reset(); + ads_call()->SubscribeLocked(type, name, /*delay_send=*/false); +} + +void XdsClient::XdsChannel::UnsubscribeLocked(const XdsResourceType* type, + const XdsResourceName& name, + bool delay_unsubscription) { + if (ads_call_ != nullptr) { + auto* call = ads_call_->call(); + if (call != nullptr) { + call->UnsubscribeLocked(type, name, delay_unsubscription); + if (!call->HasSubscribedResources()) { + ads_call_.reset(); } } } } -void XdsClient::ChannelState::OnConnectivityFailure(absl::Status status) { +void XdsClient::XdsChannel::OnConnectivityFailure(absl::Status status) { { MutexLock lock(&xds_client_->mu_); SetChannelStatusLocked(std::move(status)); @@ -565,7 +554,7 @@ void XdsClient::ChannelState::OnConnectivityFailure(absl::Status status) { xds_client_->work_serializer_.DrainQueue(); } -void XdsClient::ChannelState::SetChannelStatusLocked(absl::Status status) { +void XdsClient::XdsChannel::SetChannelStatusLocked(absl::Status status) { if (shutting_down_) return; status = absl::Status(status.code(), absl::StrCat("xDS channel for server ", server_.server_uri(), ": ", @@ -587,7 +576,7 @@ void XdsClient::ChannelState::SetChannelStatusLocked(absl::Status status) { // Find all watchers for this channel. std::set> watchers; for (const auto& a : xds_client_->authority_state_map_) { // authority - if (a.second.channel_state != this) continue; + if (a.second.xds_channel != this) continue; for (const auto& t : a.second.resource_map) { // type for (const auto& r : t.second) { // resource id for (const auto& w : r.second.watchers) { // watchers @@ -608,13 +597,13 @@ void XdsClient::ChannelState::SetChannelStatusLocked(absl::Status status) { } // -// XdsClient::ChannelState::RetryableCall<> +// XdsClient::XdsChannel::RetryableCall<> // template -XdsClient::ChannelState::RetryableCall::RetryableCall( - WeakRefCountedPtr chand) - : chand_(std::move(chand)), +XdsClient::XdsChannel::RetryableCall::RetryableCall( + WeakRefCountedPtr xds_channel) + : xds_channel_(std::move(xds_channel)), backoff_(BackOff::Options() .set_initial_backoff(Duration::Seconds( GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS)) @@ -626,42 +615,43 @@ XdsClient::ChannelState::RetryableCall::RetryableCall( } template -void XdsClient::ChannelState::RetryableCall::Orphan() { +void XdsClient::XdsChannel::RetryableCall::Orphan() { shutting_down_ = true; - calld_.reset(); + call_.reset(); if (timer_handle_.has_value()) { - chand()->xds_client()->engine()->Cancel(*timer_handle_); + xds_channel()->xds_client()->engine()->Cancel(*timer_handle_); timer_handle_.reset(); } this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned"); } template -void XdsClient::ChannelState::RetryableCall::OnCallFinishedLocked() { +void XdsClient::XdsChannel::RetryableCall::OnCallFinishedLocked() { // If we saw a response on the current stream, reset backoff. - if (calld_->seen_response()) backoff_.Reset(); - calld_.reset(); + if (call_->seen_response()) backoff_.Reset(); + call_.reset(); // Start retry timer. StartRetryTimerLocked(); } template -void XdsClient::ChannelState::RetryableCall::StartNewCallLocked() { +void XdsClient::XdsChannel::RetryableCall::StartNewCallLocked() { if (shutting_down_) return; - GPR_ASSERT(chand_->transport_ != nullptr); - GPR_ASSERT(calld_ == nullptr); + GPR_ASSERT(xds_channel_->transport_ != nullptr); + GPR_ASSERT(call_ == nullptr); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: start new call from retryable " "call %p", - chand()->xds_client(), chand()->server_.server_uri().c_str(), this); + xds_channel()->xds_client(), + xds_channel()->server_.server_uri().c_str(), this); } - calld_ = MakeOrphanable( + call_ = MakeOrphanable( this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call")); } template -void XdsClient::ChannelState::RetryableCall::StartRetryTimerLocked() { +void XdsClient::XdsChannel::RetryableCall::StartRetryTimerLocked() { if (shutting_down_) return; const Timestamp next_attempt_time = backoff_.NextAttemptTime(); const Duration timeout = @@ -670,10 +660,10 @@ void XdsClient::ChannelState::RetryableCall::StartRetryTimerLocked() { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: call attempt failed; " "retry timer will fire in %" PRId64 "ms.", - chand()->xds_client(), chand()->server_.server_uri().c_str(), - timeout.millis()); + xds_channel()->xds_client(), + xds_channel()->server_.server_uri().c_str(), timeout.millis()); } - timer_handle_ = chand()->xds_client()->engine()->RunAfter( + timer_handle_ = xds_channel()->xds_client()->engine()->RunAfter( timeout, [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() { ApplicationCallbackExecCtx callback_exec_ctx; @@ -683,8 +673,8 @@ void XdsClient::ChannelState::RetryableCall::StartRetryTimerLocked() { } template -void XdsClient::ChannelState::RetryableCall::OnRetryTimer() { - MutexLock lock(&chand_->xds_client()->mu_); +void XdsClient::XdsChannel::RetryableCall::OnRetryTimer() { + MutexLock lock(&xds_channel_->xds_client()->mu_); if (timer_handle_.has_value()) { timer_handle_.reset(); if (shutting_down_) return; @@ -692,45 +682,52 @@ void XdsClient::ChannelState::RetryableCall::OnRetryTimer() { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: retry timer fired (retryable " "call: %p)", - chand()->xds_client(), chand()->server_.server_uri().c_str(), - this); + xds_channel()->xds_client(), + xds_channel()->server_.server_uri().c_str(), this); } StartNewCallLocked(); } } // -// XdsClient::ChannelState::AdsCallState::AdsReadDelayHandle +// XdsClient::XdsChannel::AdsCall::AdsReadDelayHandle // -XdsClient::ChannelState::AdsCallState::AdsReadDelayHandle:: - ~AdsReadDelayHandle() { - XdsClient* client = ads_call_state_->xds_client(); - MutexLock lock(&client->mu_); - auto call = ads_call_state_->call_.get(); - if (call != nullptr) { - call->StartRecvMessage(); +class XdsClient::XdsChannel::AdsCall::AdsReadDelayHandle + : public XdsClient::ReadDelayHandle { + public: + explicit AdsReadDelayHandle(RefCountedPtr ads_call) + : ads_call_(std::move(ads_call)) {} + + ~AdsReadDelayHandle() override { + MutexLock lock(&ads_call_->xds_client()->mu_); + auto call = ads_call_->streaming_call_.get(); + if (call != nullptr) call->StartRecvMessage(); } -} + + private: + RefCountedPtr ads_call_; +}; // -// XdsClient::ChannelState::AdsCallState::AdsResponseParser +// XdsClient::XdsChannel::AdsCall::AdsResponseParser // -absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser:: - ProcessAdsResponseFields(AdsResponseFields fields) { +absl::Status +XdsClient::XdsChannel::AdsCall::AdsResponseParser::ProcessAdsResponseFields( + AdsResponseFields fields) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log( GPR_INFO, "[xds_client %p] xds server %s: received ADS response: type_url=%s, " "version=%s, nonce=%s, num_resources=%" PRIuPTR, - ads_call_state_->xds_client(), - ads_call_state_->chand()->server_.server_uri().c_str(), + ads_call_->xds_client(), + ads_call_->xds_channel()->server_.server_uri().c_str(), fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(), fields.num_resources); } result_.type = - ads_call_state_->xds_client()->GetResourceTypeLocked(fields.type_url); + ads_call_->xds_client()->GetResourceTypeLocked(fields.type_url); if (result_.type == nullptr) { return absl::InvalidArgumentError( absl::StrCat("unknown resource type ", fields.type_url)); @@ -739,7 +736,7 @@ absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser:: result_.version = std::move(fields.version); result_.nonce = std::move(fields.nonce); result_.read_delay_handle = - MakeRefCounted(ads_call_state_->Ref()); + MakeRefCounted(ads_call_->Ref()); return absl::OkStatus(); } @@ -769,7 +766,7 @@ void UpdateResourceMetadataNacked(const std::string& version, } // namespace -void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( +void XdsClient::XdsChannel::AdsCall::AdsResponseParser::ParseResource( upb_Arena* arena, size_t idx, absl::string_view type_url, absl::string_view resource_name, absl::string_view serialized_resource) { std::string error_prefix = absl::StrCat( @@ -784,7 +781,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( } // Parse the resource. XdsResourceType::DecodeContext context = { - xds_client(), ads_call_state_->chand()->server_, &grpc_xds_client_trace, + xds_client(), ads_call_->xds_channel()->server_, &grpc_xds_client_trace, xds_client()->def_pool_.ptr(), arena}; XdsResourceType::DecodeResult decode_result = result_.type->Decode(context, serialized_resource); @@ -818,8 +815,8 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( return; } // Cancel resource-does-not-exist timer, if needed. - auto timer_it = ads_call_state_->state_map_.find(result_.type); - if (timer_it != ads_call_state_->state_map_.end()) { + auto timer_it = ads_call_->state_map_.find(result_.type); + if (timer_it != ads_call_->state_map_.end()) { auto it = timer_it->second.subscribed_resources.find( parsed_resource_name->authority); if (it != timer_it->second.subscribed_resources.end()) { @@ -861,7 +858,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( "resource for which we previously ignored a deletion: type %s " "name %s", xds_client(), - ads_call_state_->chand()->server_.server_uri().c_str(), + ads_call_->xds_channel()->server_.server_uri().c_str(), std::string(type_url).c_str(), std::string(resource_name).c_str()); resource_state.ignored_deletion = false; } @@ -907,48 +904,47 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( DEBUG_LOCATION); } -void XdsClient::ChannelState::AdsCallState::AdsResponseParser:: +void XdsClient::XdsChannel::AdsCall::AdsResponseParser:: ResourceWrapperParsingFailed(size_t idx, absl::string_view message) { result_.errors.emplace_back( absl::StrCat("resource index ", idx, ": ", message)); } // -// XdsClient::ChannelState::AdsCallState +// XdsClient::XdsChannel::AdsCall // -XdsClient::ChannelState::AdsCallState::AdsCallState( - RefCountedPtr> parent) - : InternallyRefCounted( - GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) - ? "AdsCallState" - : nullptr), - parent_(std::move(parent)) { +XdsClient::XdsChannel::AdsCall::AdsCall( + RefCountedPtr> retryable_call) + : InternallyRefCounted( + GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "AdsCall" + : nullptr), + retryable_call_(std::move(retryable_call)) { GPR_ASSERT(xds_client() != nullptr); // Init the ADS call. const char* method = "/envoy.service.discovery.v3.AggregatedDiscoveryService/" "StreamAggregatedResources"; - call_ = chand()->transport_->CreateStreamingCall( + streaming_call_ = xds_channel()->transport_->CreateStreamingCall( method, std::make_unique( // Passing the initial ref here. This ref will go away when // the StreamEventHandler is destroyed. - RefCountedPtr(this))); - GPR_ASSERT(call_ != nullptr); + RefCountedPtr(this))); + GPR_ASSERT(streaming_call_ != nullptr); // Start the call. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: starting ADS call " - "(calld: %p, call: %p)", - xds_client(), chand()->server_.server_uri().c_str(), this, - call_.get()); + "(ads_call: %p, streaming_call: %p)", + xds_client(), xds_channel()->server_.server_uri().c_str(), this, + streaming_call_.get()); } // If this is a reconnect, add any necessary subscriptions from what's // already in the cache. for (const auto& a : xds_client()->authority_state_map_) { const std::string& authority = a.first; // Skip authorities that are not using this xDS channel. - if (a.second.channel_state != chand()) continue; + if (a.second.xds_channel != xds_channel()) continue; for (const auto& t : a.second.resource_map) { const XdsResourceType* type = t.first; for (const auto& r : t.second) { @@ -961,18 +957,19 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( for (const auto& p : state_map_) { SendMessageLocked(p.first); } - call_->StartRecvMessage(); + streaming_call_->StartRecvMessage(); } -void XdsClient::ChannelState::AdsCallState::Orphan() { +void XdsClient::XdsChannel::AdsCall::Orphan() { state_map_.clear(); // Note that the initial ref is held by the StreamEventHandler, which - // will be destroyed when call_ is destroyed, which may not happen - // here, since there may be other refs held to call_ by internal callbacks. - call_.reset(); + // will be destroyed when streaming_call_ is destroyed, which may not happen + // here, since there may be other refs held to streaming_call_ by internal + // callbacks. + streaming_call_.reset(); } -void XdsClient::ChannelState::AdsCallState::SendMessageLocked( +void XdsClient::XdsChannel::AdsCall::SendMessageLocked( const XdsResourceType* type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { // Buffer message sending if an existing message is in flight. @@ -982,24 +979,25 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( } auto& state = state_map_[type]; std::string serialized_message = xds_client()->api_.CreateAdsRequest( - type->type_url(), chand()->resource_type_version_map_[type], state.nonce, - ResourceNamesForRequest(type), state.status, !sent_initial_message_); + type->type_url(), xds_channel()->resource_type_version_map_[type], + state.nonce, ResourceNamesForRequest(type), state.status, + !sent_initial_message_); sent_initial_message_ = true; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: sending ADS request: type=%s " "version=%s nonce=%s error=%s", - xds_client(), chand()->server_.server_uri().c_str(), + xds_client(), xds_channel()->server_.server_uri().c_str(), std::string(type->type_url()).c_str(), - chand()->resource_type_version_map_[type].c_str(), + xds_channel()->resource_type_version_map_[type].c_str(), state.nonce.c_str(), state.status.ToString().c_str()); } state.status = absl::OkStatus(); - call_->SendMessage(std::move(serialized_message)); + streaming_call_->SendMessage(std::move(serialized_message)); send_message_pending_ = type; } -void XdsClient::ChannelState::AdsCallState::SubscribeLocked( +void XdsClient::XdsChannel::AdsCall::SubscribeLocked( const XdsResourceType* type, const XdsResourceName& name, bool delay_send) { auto& state = state_map_[type].subscribed_resources[name.authority][name.key]; if (state == nullptr) { @@ -1008,7 +1006,7 @@ void XdsClient::ChannelState::AdsCallState::SubscribeLocked( } } -void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked( +void XdsClient::XdsChannel::AdsCall::UnsubscribeLocked( const XdsResourceType* type, const XdsResourceName& name, bool delay_unsubscription) { auto& type_state_map = state_map_[type]; @@ -1025,14 +1023,14 @@ void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked( } } -bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const { +bool XdsClient::XdsChannel::AdsCall::HasSubscribedResources() const { for (const auto& p : state_map_) { if (!p.second.subscribed_resources.empty()) return true; } return false; } -void XdsClient::ChannelState::AdsCallState::OnRequestSent(bool ok) { +void XdsClient::XdsChannel::AdsCall::OnRequestSent(bool ok) { MutexLock lock(&xds_client()->mu_); // For each resource that was in the message we just sent, start the // resource timer if needed. @@ -1064,8 +1062,7 @@ void XdsClient::ChannelState::AdsCallState::OnRequestSent(bool ok) { } } -void XdsClient::ChannelState::AdsCallState::OnRecvMessage( - absl::string_view payload) { +void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) { // Needs to be destroyed after the mutex is released. RefCountedPtr read_delay_handle; { @@ -1082,11 +1079,11 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( gpr_log(GPR_ERROR, "[xds_client %p] xds server %s: error parsing ADS response (%s) " "-- ignoring", - xds_client(), chand()->server_.server_uri().c_str(), + xds_client(), xds_channel()->server_.server_uri().c_str(), status.ToString().c_str()); } else { seen_response_ = true; - chand()->status_ = absl::OkStatus(); + xds_channel()->status_ = absl::OkStatus(); // Update nonce. auto& state = state_map_[result.type]; state.nonce = result.nonce; @@ -1099,7 +1096,7 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( "[xds_client %p] xds server %s: ADS response invalid for " "resource " "type %s version %s, will NACK: nonce=%s status=%s", - xds_client(), chand()->server_.server_uri().c_str(), + xds_client(), xds_channel()->server_.server_uri().c_str(), result.type_url.c_str(), result.version.c_str(), state.nonce.c_str(), state.status.ToString().c_str()); } @@ -1109,7 +1106,7 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( const std::string& authority = a.first; AuthorityState& authority_state = a.second; // Skip authorities that are not using this xDS channel. - if (authority_state.channel_state != chand()) continue; + if (authority_state.xds_channel != xds_channel()) continue; auto seen_authority_it = result.resources_seen.find(authority); // Find this resource type. auto type_it = authority_state.resource_map.find(result.type); @@ -1129,12 +1126,13 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( // that the resource does not exist. For that case, we rely on // the request timeout instead. if (resource_state.resource == nullptr) continue; - if (chand()->server_.IgnoreResourceDeletion()) { + if (xds_channel()->server_.IgnoreResourceDeletion()) { if (!resource_state.ignored_deletion) { gpr_log(GPR_ERROR, "[xds_client %p] xds server %s: ignoring deletion " "for resource type %s name %s", - xds_client(), chand()->server_.server_uri().c_str(), + xds_client(), + xds_channel()->server_.server_uri().c_str(), result.type_url.c_str(), XdsClient::ConstructFullXdsResourceName( authority, result.type_url.c_str(), resource_key) @@ -1154,13 +1152,13 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( } // If we had valid resources or the update was empty, update the version. if (result.have_valid_resources || result.errors.empty()) { - chand()->resource_type_version_map_[result.type] = + xds_channel()->resource_type_version_map_[result.type] = std::move(result.version); // Start load reporting if needed. - auto& lrs_call = chand()->lrs_calld_; - if (lrs_call != nullptr) { - LrsCallState* lrs_calld = lrs_call->calld(); - if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); + auto& lrs_retryable_call = xds_channel()->lrs_call_; + if (lrs_retryable_call != nullptr) { + LrsCall* lrs_call = lrs_retryable_call->call(); + if (lrs_call != nullptr) lrs_call->MaybeStartReportingLocked(); } } // Send ACK or NACK. @@ -1170,16 +1168,16 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( xds_client()->work_serializer_.DrainQueue(); } -void XdsClient::ChannelState::AdsCallState::OnStatusReceived( - absl::Status status) { +void XdsClient::XdsChannel::AdsCall::OnStatusReceived(absl::Status status) { { MutexLock lock(&xds_client()->mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: ADS call status received " - "(chand=%p, ads_calld=%p, call=%p): %s", - xds_client(), chand()->server_.server_uri().c_str(), chand(), - this, call_.get(), status.ToString().c_str()); + "(xds_channel=%p, ads_call=%p, streaming_call=%p): %s", + xds_client(), xds_channel()->server_.server_uri().c_str(), + xds_channel(), this, streaming_call_.get(), + status.ToString().c_str()); } // Cancel any does-not-exist timers that may be pending. for (const auto& p : state_map_) { @@ -1192,12 +1190,12 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived( // Ignore status from a stale call. if (IsCurrentCallOnChannel()) { // Try to restart the call. - parent_->OnCallFinishedLocked(); + retryable_call_->OnCallFinishedLocked(); // If we didn't receive a response on the stream, report the // stream failure as a connectivity failure, which will report the // error to all watchers of resources on this channel. if (!seen_response_) { - chand()->SetChannelStatusLocked(absl::UnavailableError( + xds_channel()->SetChannelStatusLocked(absl::UnavailableError( absl::StrCat("xDS call failed with no responses received; status: ", status.ToString()))); } @@ -1206,15 +1204,15 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived( xds_client()->work_serializer_.DrainQueue(); } -bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const { +bool XdsClient::XdsChannel::AdsCall::IsCurrentCallOnChannel() const { // If the retryable ADS call is null (which only happens when the xds // channel is shutting down), all the ADS calls are stale. - if (chand()->ads_calld_ == nullptr) return false; - return this == chand()->ads_calld_->calld(); + if (xds_channel()->ads_call_ == nullptr) return false; + return this == xds_channel()->ads_call_->call(); } std::vector -XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( +XdsClient::XdsChannel::AdsCall::ResourceNamesForRequest( const XdsResourceType* type) { std::vector resource_names; auto it = state_map_.find(type); @@ -1234,10 +1232,10 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( } // -// XdsClient::ChannelState::LrsCallState::Reporter +// XdsClient::XdsChannel::LrsCall::Reporter // -void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { +void XdsClient::XdsChannel::LrsCall::Reporter::Orphan() { if (timer_handle_.has_value() && xds_client()->engine()->Cancel(*timer_handle_)) { timer_handle_.reset(); @@ -1245,12 +1243,11 @@ void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { } } -void XdsClient::ChannelState::LrsCallState::Reporter:: - ScheduleNextReportLocked() { +void XdsClient::XdsChannel::LrsCall::Reporter::ScheduleNextReportLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] xds server %s: scheduling load report timer", - xds_client(), parent_->chand()->server_.server_uri().c_str()); + gpr_log( + GPR_INFO, "[xds_client %p] xds server %s: scheduling load report timer", + xds_client(), lrs_call_->xds_channel()->server_.server_uri().c_str()); } timer_handle_ = xds_client()->engine()->RunAfter(report_interval_, [this]() { ApplicationCallbackExecCtx callback_exec_ctx; @@ -1261,7 +1258,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter:: }); } -bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer() { +bool XdsClient::XdsChannel::LrsCall::Reporter::OnNextReportTimer() { MutexLock lock(&xds_client()->mu_); timer_handle_.reset(); if (!IsCurrentReporterOnCall()) return true; @@ -1285,22 +1282,22 @@ bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) { } // namespace -bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { +bool XdsClient::XdsChannel::LrsCall::Reporter::SendReportLocked() { // Construct snapshot from all reported stats. XdsApi::ClusterLoadReportMap snapshot = - xds_client()->BuildLoadReportSnapshotLocked(parent_->chand()->server_, - parent_->send_all_clusters_, - parent_->cluster_names_); + xds_client()->BuildLoadReportSnapshotLocked( + lrs_call_->xds_channel()->server_, lrs_call_->send_all_clusters_, + lrs_call_->cluster_names_); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. const bool old_val = last_report_counters_were_zero_; last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot); if (old_val && last_report_counters_were_zero_) { auto it = xds_client()->xds_load_report_server_map_.find( - &parent_->chand()->server_); + &lrs_call_->xds_channel()->server_); if (it == xds_client()->xds_load_report_server_map_.end() || it->second.load_report_map.empty()) { - it->second.channel_state->StopLrsCallLocked(); + it->second.xds_channel->StopLrsCallLocked(); return true; } ScheduleNextReportLocked(); @@ -1309,12 +1306,12 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { // Send a request that contains the snapshot. std::string serialized_payload = xds_client()->api_.CreateLrsRequest(std::move(snapshot)); - parent_->call_->SendMessage(std::move(serialized_payload)); - parent_->send_message_pending_ = true; + lrs_call_->streaming_call_->SendMessage(std::move(serialized_payload)); + lrs_call_->send_message_pending_ = true; return false; } -void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked() { +void XdsClient::XdsChannel::LrsCall::Reporter::OnReportDoneLocked() { // If a reporter starts a send_message op, then the reporting interval // changes and we destroy that reporter and create a new one, and then // the send_message op started by the old reporter finishes, this @@ -1324,11 +1321,11 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked() { if (timer_handle_.has_value()) return; // If there are no more registered stats to report, cancel the call. auto it = xds_client()->xds_load_report_server_map_.find( - &parent_->chand()->server_); + &lrs_call_->xds_channel()->server_); if (it == xds_client()->xds_load_report_server_map_.end()) return; if (it->second.load_report_map.empty()) { - if (it->second.channel_state != nullptr) { - it->second.channel_state->StopLrsCallLocked(); + if (it->second.xds_channel != nullptr) { + it->second.xds_channel->StopLrsCallLocked(); } return; } @@ -1337,77 +1334,77 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked() { } // -// XdsClient::ChannelState::LrsCallState +// XdsClient::XdsChannel::LrsCall // -XdsClient::ChannelState::LrsCallState::LrsCallState( - RefCountedPtr> parent) - : InternallyRefCounted( - GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) - ? "LrsCallState" - : nullptr), - parent_(std::move(parent)) { +XdsClient::XdsChannel::LrsCall::LrsCall( + RefCountedPtr> retryable_call) + : InternallyRefCounted( + GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "LrsCall" + : nullptr), + retryable_call_(std::move(retryable_call)) { // Init the LRS call. Note that the call will progress every time there's // activity in xds_client()->interested_parties_, which is comprised of // the polling entities from client_channel. GPR_ASSERT(xds_client() != nullptr); const char* method = "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats"; - call_ = chand()->transport_->CreateStreamingCall( + streaming_call_ = xds_channel()->transport_->CreateStreamingCall( method, std::make_unique( // Passing the initial ref here. This ref will go away when // the StreamEventHandler is destroyed. - RefCountedPtr(this))); - GPR_ASSERT(call_ != nullptr); + RefCountedPtr(this))); + GPR_ASSERT(streaming_call_ != nullptr); // Start the call. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, - "[xds_client %p] xds server %s: starting LRS call (calld=%p, " - "call=%p)", - xds_client(), chand()->server_.server_uri().c_str(), this, - call_.get()); + "[xds_client %p] xds server %s: starting LRS call (lrs_call=%p, " + "streaming_call=%p)", + xds_client(), xds_channel()->server_.server_uri().c_str(), this, + streaming_call_.get()); } // Send the initial request. std::string serialized_payload = xds_client()->api_.CreateLrsInitialRequest(); - call_->SendMessage(std::move(serialized_payload)); + streaming_call_->SendMessage(std::move(serialized_payload)); send_message_pending_ = true; - call_->StartRecvMessage(); + streaming_call_->StartRecvMessage(); } -void XdsClient::ChannelState::LrsCallState::Orphan() { +void XdsClient::XdsChannel::LrsCall::Orphan() { reporter_.reset(); // Note that the initial ref is held by the StreamEventHandler, which - // will be destroyed when call_ is destroyed, which may not happen - // here, since there may be other refs held to call_ by internal callbacks. - call_.reset(); + // will be destroyed when streaming_call_ is destroyed, which may not happen + // here, since there may be other refs held to streaming_call_ by internal + // callbacks. + streaming_call_.reset(); } -void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() { +void XdsClient::XdsChannel::LrsCall::MaybeStartReportingLocked() { // Don't start again if already started. if (reporter_ != nullptr) return; // Don't start if the previous send_message op (of the initial request or // the last report of the previous reporter) hasn't completed. - if (call_ != nullptr && send_message_pending_) return; + if (streaming_call_ != nullptr && send_message_pending_) return; // Don't start if no LRS response has arrived. if (!seen_response()) return; // Don't start if the ADS call hasn't received any valid response. Note that // this must be the first channel because it is the current channel but its // ADS call hasn't seen any response. - if (chand()->ads_calld_ == nullptr || - chand()->ads_calld_->calld() == nullptr || - !chand()->ads_calld_->calld()->seen_response()) { + if (xds_channel()->ads_call_ == nullptr || + xds_channel()->ads_call_->call() == nullptr || + !xds_channel()->ads_call_->call()->seen_response()) { return; } // Start reporting. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: creating load reporter", - xds_client(), chand()->server_.server_uri().c_str()); + xds_client(), xds_channel()->server_.server_uri().c_str()); } reporter_ = MakeOrphanable( Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_); } -void XdsClient::ChannelState::LrsCallState::OnRequestSent(bool /*ok*/) { +void XdsClient::XdsChannel::LrsCall::OnRequestSent(bool /*ok*/) { MutexLock lock(&xds_client()->mu_); send_message_pending_ = false; if (reporter_ != nullptr) { @@ -1417,14 +1414,13 @@ void XdsClient::ChannelState::LrsCallState::OnRequestSent(bool /*ok*/) { } } -void XdsClient::ChannelState::LrsCallState::OnRecvMessage( - absl::string_view payload) { +void XdsClient::XdsChannel::LrsCall::OnRecvMessage(absl::string_view payload) { MutexLock lock(&xds_client()->mu_); // If we're no longer the current call, ignore the result. if (!IsCurrentCallOnChannel()) return; // Start recv after any code branch - auto cleanup = - absl::MakeCleanup([call = call_.get()]() { call->StartRecvMessage(); }); + auto cleanup = absl::MakeCleanup( + [call = streaming_call_.get()]() { call->StartRecvMessage(); }); // Parse the response. bool send_all_clusters = false; std::set new_cluster_names; @@ -1435,7 +1431,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage( if (!status.ok()) { gpr_log(GPR_ERROR, "[xds_client %p] xds server %s: LRS response parsing failed: %s", - xds_client(), chand()->server_.server_uri().c_str(), + xds_client(), xds_channel()->server_.server_uri().c_str(), status.ToString().c_str()); return; } @@ -1446,7 +1442,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage( "[xds_client %p] xds server %s: LRS response received, %" PRIuPTR " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64 "ms", - xds_client(), chand()->server_.server_uri().c_str(), + xds_client(), xds_channel()->server_.server_uri().c_str(), new_cluster_names.size(), send_all_clusters, new_load_reporting_interval.millis()); size_t i = 0; @@ -1463,7 +1459,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage( gpr_log(GPR_INFO, "[xds_client %p] xds server %s: increased load_report_interval " "to minimum value %dms", - xds_client(), chand()->server_.server_uri().c_str(), + xds_client(), xds_channel()->server_.server_uri().c_str(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); } } @@ -1475,7 +1471,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage( gpr_log(GPR_INFO, "[xds_client %p] xds server %s: incoming LRS response identical " "to current, ignoring.", - xds_client(), chand()->server_.server_uri().c_str()); + xds_client(), xds_channel()->server_.server_uri().c_str()); } return; } @@ -1489,28 +1485,28 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage( MaybeStartReportingLocked(); } -void XdsClient::ChannelState::LrsCallState::OnStatusReceived( - absl::Status status) { +void XdsClient::XdsChannel::LrsCall::OnStatusReceived(absl::Status status) { MutexLock lock(&xds_client()->mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: LRS call status received " - "(chand=%p, calld=%p, call=%p): %s", - xds_client(), chand()->server_.server_uri().c_str(), chand(), this, - call_.get(), status.ToString().c_str()); + "(xds_channel=%p, lrs_call=%p, streaming_call=%p): %s", + xds_client(), xds_channel()->server_.server_uri().c_str(), + xds_channel(), this, streaming_call_.get(), + status.ToString().c_str()); } // Ignore status from a stale call. if (IsCurrentCallOnChannel()) { // Try to restart the call. - parent_->OnCallFinishedLocked(); + retryable_call_->OnCallFinishedLocked(); } } -bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const { +bool XdsClient::XdsChannel::LrsCall::IsCurrentCallOnChannel() const { // If the retryable LRS call is null (which only happens when the xds // channel is shutting down), all the LRS calls are stale. - if (chand()->lrs_calld_ == nullptr) return false; - return this == chand()->lrs_calld_->calld(); + if (xds_channel()->lrs_call_ == nullptr) return false; + return this == xds_channel()->lrs_call_->call(); } // @@ -1561,24 +1557,24 @@ void XdsClient::Orphan() { invalid_watchers_.clear(); // We may still be sending lingering queued load report data, so don't // just clear the load reporting map, but we do want to clear the refs - // we're holding to the ChannelState objects, to make sure that + // we're holding to the XdsChannel objects, to make sure that // everything shuts down properly. for (auto& p : xds_load_report_server_map_) { - p.second.channel_state.reset(DEBUG_LOCATION, "XdsClient::Orphan()"); + p.second.xds_channel.reset(DEBUG_LOCATION, "XdsClient::Orphan()"); } } -RefCountedPtr XdsClient::GetOrCreateChannelStateLocked( +RefCountedPtr XdsClient::GetOrCreateXdsChannelLocked( const XdsBootstrap::XdsServer& server, const char* reason) { auto it = xds_server_channel_map_.find(&server); if (it != xds_server_channel_map_.end()) { return it->second->Ref(DEBUG_LOCATION, reason); } // Channel not found, so create a new one. - auto channel_state = MakeRefCounted( - WeakRef(DEBUG_LOCATION, "ChannelState"), server); - xds_server_channel_map_[&server] = channel_state.get(); - return channel_state; + auto xds_channel = + MakeRefCounted(WeakRef(DEBUG_LOCATION, "XdsChannel"), server); + xds_server_channel_map_[&server] = xds_channel.get(); + return xds_channel; } void XdsClient::WatchResource(const XdsResourceType* type, @@ -1687,11 +1683,11 @@ void XdsClient::WatchResource(const XdsResourceType* type, } // If the authority doesn't yet have a channel, set it, creating it if // needed. - if (authority_state.channel_state == nullptr) { - authority_state.channel_state = - GetOrCreateChannelStateLocked(*xds_server, "start watch"); + if (authority_state.xds_channel == nullptr) { + authority_state.xds_channel = + GetOrCreateXdsChannelLocked(*xds_server, "start watch"); } - absl::Status channel_status = authority_state.channel_state->status(); + absl::Status channel_status = authority_state.xds_channel->status(); if (!channel_status.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, @@ -1706,7 +1702,7 @@ void XdsClient::WatchResource(const XdsResourceType* type, }, DEBUG_LOCATION); } - authority_state.channel_state->SubscribeLocked(type, *resource_name); + authority_state.xds_channel->SubscribeLocked(type, *resource_name); } work_serializer_.DrainQueue(); } @@ -1744,13 +1740,13 @@ void XdsClient::CancelResourceWatch(const XdsResourceType* type, this, std::string(type->type_url()).c_str(), std::string(name).c_str()); } - authority_state.channel_state->UnsubscribeLocked(type, *resource_name, - delay_unsubscription); + authority_state.xds_channel->UnsubscribeLocked(type, *resource_name, + delay_unsubscription); type_map.erase(resource_it); if (type_map.empty()) { authority_state.resource_map.erase(type_it); if (authority_state.resource_map.empty()) { - authority_state.channel_state.reset(); + authority_state.xds_channel.reset(); } } } @@ -1834,9 +1830,9 @@ RefCountedPtr XdsClient::AddClusterDropStats( // in the load_report_map_ key, so that they have the same lifetime. auto server_it = xds_load_report_server_map_.emplace(server, LoadReportServer()).first; - if (server_it->second.channel_state == nullptr) { - server_it->second.channel_state = GetOrCreateChannelStateLocked( - *server, "load report map (drop stats)"); + if (server_it->second.xds_channel == nullptr) { + server_it->second.xds_channel = + GetOrCreateXdsChannelLocked(*server, "load report map (drop stats)"); } auto load_report_it = server_it->second.load_report_map .emplace(std::move(key), LoadReportState()) @@ -1856,7 +1852,7 @@ RefCountedPtr XdsClient::AddClusterDropStats( load_report_it->first.second /*eds_service_name*/); load_report_state.drop_stats = cluster_drop_stats.get(); } - server_it->second.channel_state->MaybeStartLrsCall(); + server_it->second.xds_channel->MaybeStartLrsCall(); } work_serializer_.DrainQueue(); return cluster_drop_stats; @@ -1902,8 +1898,8 @@ RefCountedPtr XdsClient::AddClusterLocalityStats( // in the load_report_map_ key, so that they have the same lifetime. auto server_it = xds_load_report_server_map_.emplace(server, LoadReportServer()).first; - if (server_it->second.channel_state == nullptr) { - server_it->second.channel_state = GetOrCreateChannelStateLocked( + if (server_it->second.xds_channel == nullptr) { + server_it->second.xds_channel = GetOrCreateXdsChannelLocked( *server, "load report map (locality stats)"); } auto load_report_it = server_it->second.load_report_map @@ -1927,7 +1923,7 @@ RefCountedPtr XdsClient::AddClusterLocalityStats( std::move(locality)); locality_state.locality_stats = cluster_locality_stats.get(); } - server_it->second.channel_state->MaybeStartLrsCall(); + server_it->second.xds_channel->MaybeStartLrsCall(); } work_serializer_.DrainQueue(); return cluster_locality_stats; diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 779198c581e..12c9d094b04 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -179,23 +179,23 @@ class XdsClient : public DualRefCounted { // Contains a channel to the xds server and all the data related to the // channel. Holds a ref to the xds client object. - class ChannelState : public DualRefCounted { + class XdsChannel : public DualRefCounted { public: template class RetryableCall; - class AdsCallState; - class LrsCallState; + class AdsCall; + class LrsCall; - ChannelState(WeakRefCountedPtr xds_client, - const XdsBootstrap::XdsServer& server); - ~ChannelState() override; + XdsChannel(WeakRefCountedPtr xds_client, + const XdsBootstrap::XdsServer& server); + ~XdsChannel() override; void Orphan() override; XdsClient* xds_client() const { return xds_client_.get(); } - AdsCallState* ads_calld() const; - LrsCallState* lrs_calld() const; + AdsCall* ads_call() const; + LrsCall* lrs_call() const; void ResetBackoff(); @@ -231,9 +231,9 @@ class XdsClient : public DualRefCounted { bool shutting_down_ = false; - // The retryable XDS calls. - OrphanablePtr> ads_calld_; - OrphanablePtr> lrs_calld_; + // The retryable ADS and LRS calls. + OrphanablePtr> ads_call_; + OrphanablePtr> lrs_call_; // Stores the most recent accepted resource version for each resource type. std::map @@ -252,7 +252,7 @@ class XdsClient : public DualRefCounted { }; struct AuthorityState { - RefCountedPtr channel_state; + RefCountedPtr xds_channel; std::map> resource_map; }; @@ -277,7 +277,7 @@ class XdsClient : public DualRefCounted { LoadReportState>; struct LoadReportServer { - RefCountedPtr channel_state; + RefCountedPtr xds_channel; LoadReportMap load_report_map; }; @@ -309,7 +309,7 @@ class XdsClient : public DualRefCounted { const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters, const std::set& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - RefCountedPtr GetOrCreateChannelStateLocked( + RefCountedPtr GetOrCreateXdsChannelLocked( const XdsBootstrap::XdsServer& server, const char* reason) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); @@ -330,8 +330,8 @@ class XdsClient : public DualRefCounted { // Map of existing xDS server channels. // Key is owned by the bootstrap config. - std::map - xds_server_channel_map_ ABSL_GUARDED_BY(mu_); + std::map xds_server_channel_map_ + ABSL_GUARDED_BY(mu_); std::map authority_state_map_ ABSL_GUARDED_BY(mu_);