diff --git a/src/core/ext/xds/certificate_provider_store.h b/src/core/ext/xds/certificate_provider_store.h index 0954bc5e809..fb6ca72d6d0 100644 --- a/src/core/ext/xds/certificate_provider_store.h +++ b/src/core/ext/xds/certificate_provider_store.h @@ -92,7 +92,7 @@ class CertificateProviderStore }; RefCountedPtr CreateCertificateProviderLocked( - absl::string_view key); + absl::string_view key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Releases a previously created certificate provider from the certificate // provider map if the value matches \a wrapper. @@ -101,10 +101,10 @@ class CertificateProviderStore Mutex mu_; // Map of plugin configurations - PluginDefinitionMap plugin_config_map_; + PluginDefinitionMap plugin_config_map_ ABSL_GUARDED_BY(mu_); // Underlying map for the providers. std::map - certificate_providers_map_; + certificate_providers_map_ ABSL_GUARDED_BY(mu_); }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_certificate_provider.h b/src/core/ext/xds/xds_certificate_provider.h index 6b190cad53c..2f508830f76 100644 --- a/src/core/ext/xds/xds_certificate_provider.h +++ b/src/core/ext/xds/xds_certificate_provider.h @@ -127,9 +127,11 @@ class XdsCertificateProvider : public grpc_tls_certificate_provider { void WatchStatusCallback(std::string cert_name, bool root_being_watched, bool identity_being_watched); + RefCountedPtr distributor_; + Mutex mu_; std::map> - certificate_state_map_; + certificate_state_map_ ABSL_GUARDED_BY(mu_); // Use a separate mutex for san_matchers_ to avoid deadlocks since // san_matchers_ needs to be accessed when a handshake is being done and we @@ -141,9 +143,7 @@ class XdsCertificateProvider : public grpc_tls_certificate_provider { // subject_alternative_names_matchers() Mutex san_matchers_mu_; std::map> - san_matcher_map_; - - RefCountedPtr distributor_; + san_matcher_map_ ABSL_GUARDED_BY(san_matchers_mu_); }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index f0b576206bc..666e5359b2e 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -1,20 +1,18 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #include @@ -72,9 +70,9 @@ TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount"); namespace { Mutex* g_mu = nullptr; -const grpc_channel_args* g_channel_args = nullptr; -XdsClient* g_xds_client = nullptr; -char* g_fallback_bootstrap_config = nullptr; +const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr; +XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr; +char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; } // namespace @@ -135,9 +133,11 @@ class XdsClient::ChannelState::AdsCallState XdsClient* xds_client() const { return chand()->xds_client(); } bool seen_response() const { return seen_response_; } - void Subscribe(const std::string& type_url, const std::string& name); - void Unsubscribe(const std::string& type_url, const std::string& name, - bool delay_unsubscription); + void SubscribeLocked(const std::string& type_url, const std::string& name) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + void UnsubscribeLocked(const std::string& type_url, const std::string& name, + bool delay_unsubscription) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool HasSubscribedResources() const; @@ -188,7 +188,8 @@ class XdsClient::ChannelState::AdsCallState self->Unref(DEBUG_LOCATION, "timer"); } - void OnTimerLocked(grpc_error* error) { + void OnTimerLocked(grpc_error* error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { if (error == GRPC_ERROR_NONE && timer_pending_) { timer_pending_ = false; grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( @@ -257,23 +258,31 @@ class XdsClient::ChannelState::AdsCallState subscribed_resources; }; - void SendMessageLocked(const std::string& type_url); - - void AcceptLdsUpdate(std::string version, grpc_millis update_time, - XdsApi::LdsUpdateMap lds_update_map); - void AcceptRdsUpdate(std::string version, grpc_millis update_time, - XdsApi::RdsUpdateMap rds_update_map); - void AcceptCdsUpdate(std::string version, grpc_millis update_time, - XdsApi::CdsUpdateMap cds_update_map); - void AcceptEdsUpdate(std::string version, grpc_millis update_time, - XdsApi::EdsUpdateMap eds_update_map); + void SendMessageLocked(const std::string& type_url) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + + void AcceptLdsUpdateLocked(std::string version, grpc_millis update_time, + XdsApi::LdsUpdateMap lds_update_map) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time, + XdsApi::RdsUpdateMap rds_update_map) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + void AcceptCdsUpdateLocked(std::string version, grpc_millis update_time, + XdsApi::CdsUpdateMap cds_update_map) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time, + XdsApi::EdsUpdateMap eds_update_map) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); static void OnRequestSent(void* arg, grpc_error* error); - void OnRequestSentLocked(grpc_error* error); + void OnRequestSentLocked(grpc_error* error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); static void OnResponseReceived(void* arg, grpc_error* error); - bool OnResponseReceivedLocked(); + bool OnResponseReceivedLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); static void OnStatusReceived(void* arg, grpc_error* error); - void OnStatusReceivedLocked(grpc_error* error); + void OnStatusReceivedLocked(grpc_error* error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool IsCurrentCallOnChannel() const; @@ -346,12 +355,15 @@ class XdsClient::ChannelState::LrsCallState void Orphan() override; private: - void ScheduleNextReportLocked(); + void ScheduleNextReportLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); static void OnNextReportTimer(void* arg, grpc_error* error); - bool OnNextReportTimerLocked(grpc_error* error); - bool SendReportLocked(); + bool OnNextReportTimerLocked(grpc_error* error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); static void OnReportDone(void* arg, grpc_error* error); - bool OnReportDoneLocked(grpc_error* error); + bool OnReportDoneLocked(grpc_error* error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool IsCurrentReporterOnCall() const { return this == parent_->reporter_.get(); @@ -371,11 +383,14 @@ class XdsClient::ChannelState::LrsCallState }; static void OnInitialRequestSent(void* arg, grpc_error* error); - void OnInitialRequestSentLocked(); + void OnInitialRequestSentLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); static void OnResponseReceived(void* arg, grpc_error* error); - bool OnResponseReceivedLocked(); + bool OnResponseReceivedLocked() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); static void OnStatusReceived(void* arg, grpc_error* error); - void OnStatusReceivedLocked(grpc_error* error); + void OnStatusReceivedLocked(grpc_error* error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool IsCurrentCallOnChannel() const; @@ -431,7 +446,7 @@ class XdsClient::ChannelState::StateWatcher "[xds_client %p] xds channel in state:TRANSIENT_FAILURE " "status_message:(%s)", parent_->xds_client(), status.ToString().c_str()); - parent_->xds_client()->NotifyOnErrorLocked( + parent_->xds_client_->NotifyOnErrorLocked( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "xds channel in TRANSIENT_FAILURE")); } @@ -446,26 +461,13 @@ class XdsClient::ChannelState::StateWatcher namespace { -grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) { - // Build channel args. - absl::InlinedVector args_to_add = { - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), - 5 * 60 * GPR_MS_PER_SEC), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1), - }; - grpc_channel_args* new_args = grpc_channel_args_copy_and_add( - g_channel_args, args_to_add.data(), args_to_add.size()); - // Create channel creds. +grpc_channel* CreateXdsChannel(grpc_channel_args* args, + const XdsBootstrap::XdsServer& server) { RefCountedPtr channel_creds = XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type, server.channel_creds_config); - // Create channel. - grpc_channel* channel = grpc_secure_channel_create( - channel_creds.get(), server.server_uri.c_str(), new_args, nullptr); - grpc_channel_args_destroy(new_args); - return channel; + return grpc_secure_channel_create(channel_creds.get(), + server.server_uri.c_str(), args, nullptr); } } // namespace @@ -482,7 +484,7 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr xds_client, gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", xds_client_.get(), server.server_uri.c_str()); } - channel_ = CreateXdsChannel(server); + channel_ = CreateXdsChannel(xds_client_->args_, server); GPR_ASSERT(channel_ != nullptr); StartConnectivityWatchLocked(); } @@ -543,8 +545,8 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() { grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_); } -void XdsClient::ChannelState::Subscribe(const std::string& type_url, - const std::string& name) { +void XdsClient::ChannelState::SubscribeLocked(const std::string& type_url, + const std::string& name) { if (ads_calld_ == nullptr) { // Start the ADS call if this is the first request. ads_calld_.reset(new RetryableCall( @@ -558,16 +560,16 @@ void XdsClient::ChannelState::Subscribe(const std::string& type_url, // because when the call is restarted it will resend all necessary requests. if (ads_calld() == nullptr) return; // Subscribe to this resource if the ADS call is active. - ads_calld()->Subscribe(type_url, name); + ads_calld()->SubscribeLocked(type_url, name); } -void XdsClient::ChannelState::Unsubscribe(const std::string& type_url, - const std::string& name, - bool delay_unsubscription) { +void XdsClient::ChannelState::UnsubscribeLocked(const std::string& type_url, + const std::string& name, + bool delay_unsubscription) { if (ads_calld_ != nullptr) { auto* calld = ads_calld_->calld(); if (calld != nullptr) { - calld->Unsubscribe(type_url, name, delay_unsubscription); + calld->UnsubscribeLocked(type_url, name, delay_unsubscription); if (!calld->HasSubscribedResources()) ads_calld_.reset(); } } @@ -729,16 +731,16 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, grpc_schedule_on_exec_ctx); for (const auto& p : xds_client()->listener_map_) { - Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first)); + SubscribeLocked(XdsApi::kLdsTypeUrl, std::string(p.first)); } for (const auto& p : xds_client()->route_config_map_) { - Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first)); + SubscribeLocked(XdsApi::kRdsTypeUrl, std::string(p.first)); } for (const auto& p : xds_client()->cluster_map_) { - Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first)); + SubscribeLocked(XdsApi::kCdsTypeUrl, std::string(p.first)); } for (const auto& p : xds_client()->endpoint_map_) { - Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first)); + SubscribeLocked(XdsApi::kEdsTypeUrl, std::string(p.first)); } // Op: recv initial metadata. op = ops; @@ -802,7 +804,8 @@ void XdsClient::ChannelState::AdsCallState::Orphan() { } void XdsClient::ChannelState::AdsCallState::SendMessageLocked( - const std::string& type_url) { + const std::string& type_url) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { // Buffer message sending if an existing message is in flight. if (send_message_payload_ != nullptr) { buffered_requests_.insert(type_url); @@ -854,7 +857,7 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( } } -void XdsClient::ChannelState::AdsCallState::Subscribe( +void XdsClient::ChannelState::AdsCallState::SubscribeLocked( const std::string& type_url, const std::string& name) { auto& state = state_map_[type_url].subscribed_resources[name]; if (state == nullptr) { @@ -864,7 +867,7 @@ void XdsClient::ChannelState::AdsCallState::Subscribe( } } -void XdsClient::ChannelState::AdsCallState::Unsubscribe( +void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked( const std::string& type_url, const std::string& name, bool delay_unsubscription) { state_map_[type_url].subscribed_resources.erase(name); @@ -894,7 +897,7 @@ XdsApi::ResourceMetadata CreateResourceMetadataAcked( } // namespace -void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( +void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( std::string version, grpc_millis update_time, XdsApi::LdsUpdateMap lds_update_map) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -978,7 +981,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( } } -void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( +void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked( std::string version, grpc_millis update_time, XdsApi::RdsUpdateMap rds_update_map) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -1020,7 +1023,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( } } -void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( +void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( std::string version, grpc_millis update_time, XdsApi::CdsUpdateMap cds_update_map) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -1101,7 +1104,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( } } -void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate( +void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked( std::string version, grpc_millis update_time, XdsApi::EdsUpdateMap eds_update_map) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -1221,8 +1224,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { state.nonce = std::move(result.nonce); // NACK or ACK the response. if (result.parse_error != GRPC_ERROR_NONE) { - xds_client()->UpdateResourceMetadataWithFailedParseResult(update_time, - result); + xds_client()->UpdateResourceMetadataWithFailedParseResultLocked( + update_time, result); GRPC_ERROR_UNREF(state.error); state.error = result.parse_error; // NACK unacceptable update. @@ -1236,17 +1239,17 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { seen_response_ = true; // Accept the ADS response according to the type_url. if (result.type_url == XdsApi::kLdsTypeUrl) { - AcceptLdsUpdate(result.version, update_time, - std::move(result.lds_update_map)); + AcceptLdsUpdateLocked(result.version, update_time, + std::move(result.lds_update_map)); } else if (result.type_url == XdsApi::kRdsTypeUrl) { - AcceptRdsUpdate(result.version, update_time, - std::move(result.rds_update_map)); + AcceptRdsUpdateLocked(result.version, update_time, + std::move(result.rds_update_map)); } else if (result.type_url == XdsApi::kCdsTypeUrl) { - AcceptCdsUpdate(result.version, update_time, - std::move(result.cds_update_map)); + AcceptCdsUpdateLocked(result.version, update_time, + std::move(result.cds_update_map)); } else if (result.type_url == XdsApi::kEdsTypeUrl) { - AcceptEdsUpdate(result.version, update_time, - std::move(result.eds_update_map)); + AcceptEdsUpdateLocked(result.version, update_time, + std::move(result.eds_update_map)); } xds_client()->resource_version_map_[result.type_url] = std::move(result.version); @@ -1769,19 +1772,20 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const { namespace { -grpc_millis GetRequestTimeout() { +grpc_millis GetRequestTimeout(grpc_channel_args* args) { return grpc_channel_args_find_integer( - g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS, + args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS, {15000, 0, INT_MAX}); } } // namespace -XdsClient::XdsClient(grpc_error** error) +XdsClient::XdsClient(grpc_channel_args* args, grpc_error** error) : DualRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient" : nullptr), - request_timeout_(GetRequestTimeout()), + args_(args), + request_timeout_(GetRequestTimeout(args)), interested_parties_(grpc_pollset_set_create()), bootstrap_(XdsBootstrap::Create(this, &grpc_xds_client_trace, g_fallback_bootstrap_config, error)), @@ -1808,11 +1812,13 @@ XdsClient::~XdsClient() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this); } + grpc_channel_args_destroy(args_); grpc_pollset_set_destroy(interested_parties_); } void XdsClient::AddChannelzLinkage( channelz::ChannelNode* parent_channelz_node) { + MutexLock lock(&mu_); channelz::ChannelNode* xds_channelz_node = grpc_channel_get_channelz_node(chand_->channel()); if (xds_channelz_node != nullptr) { @@ -1822,6 +1828,7 @@ void XdsClient::AddChannelzLinkage( void XdsClient::RemoveChannelzLinkage( channelz::ChannelNode* parent_channelz_node) { + MutexLock lock(&mu_); channelz::ChannelNode* xds_channelz_node = grpc_channel_get_channelz_node(chand_->channel()); if (xds_channelz_node != nullptr) { @@ -1872,7 +1879,7 @@ void XdsClient::WatchListenerData( } w->OnListenerChanged(*listener_state.update); } - chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str); + chand_->SubscribeLocked(XdsApi::kLdsTypeUrl, listener_name_str); } void XdsClient::CancelListenerDataWatch(absl::string_view listener_name, @@ -1887,8 +1894,8 @@ void XdsClient::CancelListenerDataWatch(absl::string_view listener_name, listener_state.watchers.erase(it); if (listener_state.watchers.empty()) { listener_map_.erase(listener_name_str); - chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str, - delay_unsubscription); + chand_->UnsubscribeLocked(XdsApi::kLdsTypeUrl, listener_name_str, + delay_unsubscription); } } } @@ -1912,7 +1919,7 @@ void XdsClient::WatchRouteConfigData( } w->OnRouteConfigChanged(*route_config_state.update); } - chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str); + chand_->SubscribeLocked(XdsApi::kRdsTypeUrl, route_config_name_str); } void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name, @@ -1928,8 +1935,8 @@ void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name, route_config_state.watchers.erase(it); if (route_config_state.watchers.empty()) { route_config_map_.erase(route_config_name_str); - chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str, - delay_unsubscription); + chand_->UnsubscribeLocked(XdsApi::kRdsTypeUrl, route_config_name_str, + delay_unsubscription); } } } @@ -1951,7 +1958,7 @@ void XdsClient::WatchClusterData( } w->OnClusterChanged(cluster_state.update.value()); } - chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str); + chand_->SubscribeLocked(XdsApi::kCdsTypeUrl, cluster_name_str); } void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name, @@ -1966,8 +1973,8 @@ void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name, cluster_state.watchers.erase(it); if (cluster_state.watchers.empty()) { cluster_map_.erase(cluster_name_str); - chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str, - delay_unsubscription); + chand_->UnsubscribeLocked(XdsApi::kCdsTypeUrl, cluster_name_str, + delay_unsubscription); } } } @@ -1989,7 +1996,7 @@ void XdsClient::WatchEndpointData( } w->OnEndpointChanged(endpoint_state.update.value()); } - chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str); + chand_->SubscribeLocked(XdsApi::kEdsTypeUrl, eds_service_name_str); } void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name, @@ -2004,8 +2011,8 @@ void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name, endpoint_state.watchers.erase(it); if (endpoint_state.watchers.empty()) { endpoint_map_.erase(eds_service_name_str); - chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str, - delay_unsubscription); + chand_->UnsubscribeLocked(XdsApi::kEdsTypeUrl, eds_service_name_str, + delay_unsubscription); } } } @@ -2241,7 +2248,7 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( return snapshot_map; } -void XdsClient::UpdateResourceMetadataWithFailedParseResult( +void XdsClient::UpdateResourceMetadataWithFailedParseResultLocked( grpc_millis update_time, const XdsApi::AdsParseResult& result) { // ADS update is rejected and the resource names in the failed update is // available. @@ -2324,11 +2331,13 @@ void XdsClientGlobalInit() { XdsHttpFilterRegistry::Init(); } -void XdsClientGlobalShutdown() { - delete g_mu; - g_mu = nullptr; +// TODO(roth): Find a better way to clear the fallback config that does +// not require using ABSL_NO_THREAD_SAFETY_ANALYSIS. +void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS { gpr_free(g_fallback_bootstrap_config); g_fallback_bootstrap_config = nullptr; + delete g_mu; + g_mu = nullptr; XdsHttpFilterRegistry::Shutdown(); } @@ -2340,7 +2349,18 @@ RefCountedPtr XdsClient::GetOrCreate(grpc_error** error) { auto xds_client = g_xds_client->RefIfNonZero(); if (xds_client != nullptr) return xds_client; } - xds_client = MakeRefCounted(error); + // Build channel args. + absl::InlinedVector args_to_add = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), + 5 * 60 * GPR_MS_PER_SEC), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1), + }; + grpc_channel_args* args = grpc_channel_args_copy_and_add( + g_channel_args, args_to_add.data(), args_to_add.size()); + // Instantiate XdsClient. + xds_client = MakeRefCounted(args, error); if (*error != GRPC_ERROR_NONE) return nullptr; g_xds_client = xds_client.get(); } diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 41e0a379568..beb29b968b5 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -85,7 +85,7 @@ class XdsClient : public DualRefCounted { static RefCountedPtr GetOrCreate(grpc_error** error); // Callers should not instantiate directly. Use GetOrCreate() instead. - explicit XdsClient(grpc_error** error); + XdsClient(grpc_channel_args* args, grpc_error** error); ~XdsClient() override; const XdsBootstrap& bootstrap() const { @@ -236,9 +236,11 @@ class XdsClient : public DualRefCounted { void StartConnectivityWatchLocked(); void CancelConnectivityWatchLocked(); - void Subscribe(const std::string& type_url, const std::string& name); - void Unsubscribe(const std::string& type_url, const std::string& name, - bool delay_unsubscription); + void SubscribeLocked(const std::string& type_url, const std::string& name) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + void UnsubscribeLocked(const std::string& type_url, const std::string& name, + bool delay_unsubscription) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); private: class StateWatcher; @@ -308,17 +310,18 @@ class XdsClient : public DualRefCounted { }; // Sends an error notification to all watchers. - void NotifyOnErrorLocked(grpc_error* error); + void NotifyOnErrorLocked(grpc_error* error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( - bool send_all_clusters, const std::set& clusters); + bool send_all_clusters, const std::set& clusters) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - void UpdateResourceMetadataWithFailedParseResult( - grpc_millis update_time, const XdsApi::AdsParseResult& result); - void UpdatePendingResources( - const std::string& type_url, - XdsApi::ResourceMetadataMap* resource_metadata_map); + void UpdateResourceMetadataWithFailedParseResultLocked( + grpc_millis update_time, const XdsApi::AdsParseResult& result) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + grpc_channel_args* args_; const grpc_millis request_timeout_; grpc_pollset_set* interested_parties_; std::unique_ptr bootstrap_; @@ -328,28 +331,32 @@ class XdsClient : public DualRefCounted { Mutex mu_; // The channel for communicating with the xds server. - OrphanablePtr chand_; + OrphanablePtr chand_ ABSL_GUARDED_BY(mu_); // One entry for each watched LDS resource. - std::map listener_map_; + std::map listener_map_ + ABSL_GUARDED_BY(mu_); // One entry for each watched RDS resource. std::map - route_config_map_; + route_config_map_ ABSL_GUARDED_BY(mu_); // One entry for each watched CDS resource. - std::map cluster_map_; + std::map cluster_map_ + ABSL_GUARDED_BY(mu_); // One entry for each watched EDS resource. - std::map endpoint_map_; + std::map endpoint_map_ + ABSL_GUARDED_BY(mu_); // Load report data. std::map< std::pair, LoadReportState> - load_report_map_; + load_report_map_ ABSL_GUARDED_BY(mu_); // Stores the most recent accepted resource version for each resource type. - std::map resource_version_map_; + std::map resource_version_map_ + ABSL_GUARDED_BY(mu_); - bool shutting_down_ = false; + bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; }; namespace internal { @@ -362,4 +369,4 @@ void SetXdsFallbackBootstrapConfig(const char* config); } // namespace grpc_core -#endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */ +#endif // GRPC_CORE_EXT_XDS_XDS_CLIENT_H diff --git a/src/core/ext/xds/xds_client_stats.h b/src/core/ext/xds/xds_client_stats.h index 852dda26089..b300fc59256 100644 --- a/src/core/ext/xds/xds_client_stats.h +++ b/src/core/ext/xds/xds_client_stats.h @@ -149,7 +149,7 @@ class XdsClusterDropStats : public RefCounted { // dropped_requests can be accessed by both the picker (from data plane // mutex) and the load reporting thread (from the control plane combiner). Mutex mu_; - CategorizedDropsMap categorized_drops_; + CategorizedDropsMap categorized_drops_ ABSL_GUARDED_BY(mu_); }; // Locality stats for an xds cluster. @@ -231,7 +231,8 @@ class XdsClusterLocalityStats : public RefCounted { // call's recv_trailing_metadata (not from the control plane work serializer) // and the load reporting thread (from the control plane work serializer). Mutex backend_metrics_mu_; - std::map backend_metrics_; + std::map backend_metrics_ + ABSL_GUARDED_BY(backend_metrics_mu_); }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_server_config_fetcher.cc b/src/core/ext/xds/xds_server_config_fetcher.cc index 0abfd71593b..1c5d59f55c2 100644 --- a/src/core/ext/xds/xds_server_config_fetcher.cc +++ b/src/core/ext/xds/xds_server_config_fetcher.cc @@ -512,7 +512,7 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { grpc_server_xds_status_notifier serving_status_notifier_; Mutex mu_; std::map - watchers_; + watchers_ ABSL_GUARDED_BY(mu_); }; } // namespace