From 4c9850d5305c22d52ec5b722d64463b56bbba3ff Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 30 Aug 2024 22:40:16 +0000 Subject: [PATCH] remove old LRS code --- BUILD | 2 +- src/core/xds/xds_client/xds_api.cc | 220 -------- src/core/xds/xds_client/xds_api.h | 24 - src/core/xds/xds_client/xds_client.cc | 566 -------------------- src/core/xds/xds_client/xds_client.h | 76 +-- src/core/xds/xds_client/xds_client_stats.cc | 164 ------ src/core/xds/xds_client/xds_client_stats.h | 162 +----- 7 files changed, 16 insertions(+), 1198 deletions(-) delete mode 100644 src/core/xds/xds_client/xds_client_stats.cc diff --git a/BUILD b/BUILD index 373f2bd50e0..f7735f64bd8 100644 --- a/BUILD +++ b/BUILD @@ -4398,7 +4398,6 @@ grpc_cc_library( "//src/core:xds/xds_client/xds_api.cc", "//src/core:xds/xds_client/xds_bootstrap.cc", "//src/core:xds/xds_client/xds_client.cc", - "//src/core:xds/xds_client/xds_client_stats.cc", ], hdrs = [ "//src/core:xds/xds_client/lrs_client.h", @@ -4465,6 +4464,7 @@ grpc_cc_library( "//src/core:json", "//src/core:per_cpu", "//src/core:ref_counted", + "//src/core:ref_counted_string", "//src/core:time", "//src/core:upb_utils", "//src/core:useful", diff --git a/src/core/xds/xds_client/xds_api.cc b/src/core/xds/xds_client/xds_api.cc index ebc7c578eb6..50522961920 100644 --- a/src/core/xds/xds_client/xds_api.cc +++ b/src/core/xds/xds_client/xds_api.cc @@ -352,224 +352,4 @@ absl::Status XdsApi::ParseAdsResponse(absl::string_view encoded_response, return absl::OkStatus(); } -namespace { - -void MaybeLogLrsRequest( - const XdsApiContext& context, - const envoy_service_load_stats_v3_LoadStatsRequest* request) { - if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer) && ABSL_VLOG_IS_ON(2)) { - const upb_MessageDef* msg_type = - envoy_service_load_stats_v3_LoadStatsRequest_getmsgdef( - context.def_pool); - char buf[10240]; - upb_TextEncode(reinterpret_cast(request), msg_type, - nullptr, 0, buf, sizeof(buf)); - VLOG(2) << "[xds_client " << context.client - << "] constructed LRS request: " << buf; - } -} - -std::string SerializeLrsRequest( - const XdsApiContext& context, - const envoy_service_load_stats_v3_LoadStatsRequest* request) { - size_t output_length; - char* output = envoy_service_load_stats_v3_LoadStatsRequest_serialize( - request, context.arena, &output_length); - return std::string(output, output_length); -} - -} // namespace - -std::string XdsApi::CreateLrsInitialRequest() { - upb::Arena arena; - const XdsApiContext context = {client_, tracer_, def_pool_->ptr(), - arena.ptr()}; - // Create a request. - envoy_service_load_stats_v3_LoadStatsRequest* request = - envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr()); - // Populate node. - envoy_config_core_v3_Node* node_msg = - envoy_service_load_stats_v3_LoadStatsRequest_mutable_node(request, - arena.ptr()); - PopulateNode(node_msg, arena.ptr()); - envoy_config_core_v3_Node_add_client_features( - node_msg, - upb_StringView_FromString("envoy.lrs.supports_send_all_clusters"), - arena.ptr()); - MaybeLogLrsRequest(context, request); - return SerializeLrsRequest(context, request); -} - -namespace { - -void LocalityStatsPopulate( - const XdsApiContext& context, - envoy_config_endpoint_v3_UpstreamLocalityStats* output, - const XdsLocalityName& locality_name, - const XdsClusterLocalityStats::Snapshot& snapshot) { - // Set locality. - envoy_config_core_v3_Locality* locality = - envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_locality( - output, context.arena); - if (!locality_name.region().empty()) { - envoy_config_core_v3_Locality_set_region( - locality, StdStringToUpbString(locality_name.region())); - } - if (!locality_name.zone().empty()) { - envoy_config_core_v3_Locality_set_zone( - locality, StdStringToUpbString(locality_name.zone())); - } - if (!locality_name.sub_zone().empty()) { - envoy_config_core_v3_Locality_set_sub_zone( - locality, StdStringToUpbString(locality_name.sub_zone())); - } - // Set total counts. - envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_successful_requests( - output, snapshot.total_successful_requests); - envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_requests_in_progress( - output, snapshot.total_requests_in_progress); - envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_error_requests( - output, snapshot.total_error_requests); - envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_issued_requests( - output, snapshot.total_issued_requests); - // Add backend metrics. - for (const auto& p : snapshot.backend_metrics) { - const std::string& metric_name = p.first; - const XdsClusterLocalityStats::BackendMetric& metric_value = p.second; - envoy_config_endpoint_v3_EndpointLoadMetricStats* load_metric = - envoy_config_endpoint_v3_UpstreamLocalityStats_add_load_metric_stats( - output, context.arena); - envoy_config_endpoint_v3_EndpointLoadMetricStats_set_metric_name( - load_metric, StdStringToUpbString(metric_name)); - envoy_config_endpoint_v3_EndpointLoadMetricStats_set_num_requests_finished_with_metric( - load_metric, metric_value.num_requests_finished_with_metric); - envoy_config_endpoint_v3_EndpointLoadMetricStats_set_total_metric_value( - load_metric, metric_value.total_metric_value); - } -} - -} // namespace - -std::string XdsApi::CreateLrsRequest( - ClusterLoadReportMap cluster_load_report_map) { - upb::Arena arena; - const XdsApiContext context = {client_, tracer_, def_pool_->ptr(), - arena.ptr()}; - // Create a request. - envoy_service_load_stats_v3_LoadStatsRequest* request = - envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr()); - for (auto& p : cluster_load_report_map) { - const std::string& cluster_name = p.first.first; - const std::string& eds_service_name = p.first.second; - const ClusterLoadReport& load_report = p.second; - // Add cluster stats. - envoy_config_endpoint_v3_ClusterStats* cluster_stats = - envoy_service_load_stats_v3_LoadStatsRequest_add_cluster_stats( - request, arena.ptr()); - // Set the cluster name. - envoy_config_endpoint_v3_ClusterStats_set_cluster_name( - cluster_stats, StdStringToUpbString(cluster_name)); - // Set EDS service name, if non-empty. - if (!eds_service_name.empty()) { - envoy_config_endpoint_v3_ClusterStats_set_cluster_service_name( - cluster_stats, StdStringToUpbString(eds_service_name)); - } - // Add locality stats. - for (const auto& p : load_report.locality_stats) { - const XdsLocalityName& locality_name = *p.first; - const auto& snapshot = p.second; - envoy_config_endpoint_v3_UpstreamLocalityStats* locality_stats = - envoy_config_endpoint_v3_ClusterStats_add_upstream_locality_stats( - cluster_stats, arena.ptr()); - LocalityStatsPopulate(context, locality_stats, locality_name, snapshot); - } - // Add dropped requests. - uint64_t total_dropped_requests = 0; - for (const auto& p : load_report.dropped_requests.categorized_drops) { - const std::string& category = p.first; - const uint64_t count = p.second; - envoy_config_endpoint_v3_ClusterStats_DroppedRequests* dropped_requests = - envoy_config_endpoint_v3_ClusterStats_add_dropped_requests( - cluster_stats, arena.ptr()); - envoy_config_endpoint_v3_ClusterStats_DroppedRequests_set_category( - dropped_requests, StdStringToUpbString(category)); - envoy_config_endpoint_v3_ClusterStats_DroppedRequests_set_dropped_count( - dropped_requests, count); - total_dropped_requests += count; - } - total_dropped_requests += load_report.dropped_requests.uncategorized_drops; - // Set total dropped requests. - envoy_config_endpoint_v3_ClusterStats_set_total_dropped_requests( - cluster_stats, total_dropped_requests); - // Set real load report interval. - gpr_timespec timespec = load_report.load_report_interval.as_timespec(); - google_protobuf_Duration* load_report_interval = - envoy_config_endpoint_v3_ClusterStats_mutable_load_report_interval( - cluster_stats, arena.ptr()); - google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec); - google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec); - } - MaybeLogLrsRequest(context, request); - return SerializeLrsRequest(context, request); -} - -namespace { - -void MaybeLogLrsResponse( - const XdsApiContext& context, - const envoy_service_load_stats_v3_LoadStatsResponse* response) { - if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer) && ABSL_VLOG_IS_ON(2)) { - const upb_MessageDef* msg_type = - envoy_service_load_stats_v3_LoadStatsResponse_getmsgdef( - context.def_pool); - char buf[10240]; - upb_TextEncode(reinterpret_cast(response), msg_type, - nullptr, 0, buf, sizeof(buf)); - VLOG(2) << "[xds_client " << context.client - << "] received LRS response: " << buf; - } -} - -} // namespace - -absl::Status XdsApi::ParseLrsResponse(absl::string_view encoded_response, - bool* send_all_clusters, - std::set* cluster_names, - Duration* load_reporting_interval) { - upb::Arena arena; - // Decode the response. - const envoy_service_load_stats_v3_LoadStatsResponse* decoded_response = - envoy_service_load_stats_v3_LoadStatsResponse_parse( - encoded_response.data(), encoded_response.size(), arena.ptr()); - // Parse the response. - if (decoded_response == nullptr) { - return absl::UnavailableError("Can't decode response."); - } - const XdsApiContext context = {client_, tracer_, def_pool_->ptr(), - arena.ptr()}; - MaybeLogLrsResponse(context, decoded_response); - // Check send_all_clusters. - if (envoy_service_load_stats_v3_LoadStatsResponse_send_all_clusters( - decoded_response)) { - *send_all_clusters = true; - } else { - // Store the cluster names. - size_t size; - const upb_StringView* clusters = - envoy_service_load_stats_v3_LoadStatsResponse_clusters(decoded_response, - &size); - for (size_t i = 0; i < size; ++i) { - cluster_names->emplace(UpbStringToStdString(clusters[i])); - } - } - // Get the load report interval. - const google_protobuf_Duration* load_reporting_interval_duration = - envoy_service_load_stats_v3_LoadStatsResponse_load_reporting_interval( - decoded_response); - *load_reporting_interval = Duration::FromSecondsAndNanoseconds( - google_protobuf_Duration_seconds(load_reporting_interval_duration), - google_protobuf_Duration_nanos(load_reporting_interval_duration)); - return absl::OkStatus(); -} - } // namespace grpc_core diff --git a/src/core/xds/xds_client/xds_api.h b/src/core/xds/xds_client/xds_api.h index f62f70a9436..36adb86e046 100644 --- a/src/core/xds/xds_client/xds_api.h +++ b/src/core/xds/xds_client/xds_api.h @@ -81,17 +81,6 @@ class XdsApi final { absl::string_view message) = 0; }; - struct ClusterLoadReport { - XdsClusterDropStats::Snapshot dropped_requests; - std::map, XdsClusterLocalityStats::Snapshot, - XdsLocalityName::Less> - locality_stats; - Duration load_report_interval; - }; - using ClusterLoadReportMap = std::map< - std::pair, - ClusterLoadReport>; - // The metadata of the xDS resource; used by the xDS config dump. struct ResourceMetadata { // Resource status from the view of a xDS client, which tells the @@ -160,19 +149,6 @@ class XdsApi final { absl::Status ParseAdsResponse(absl::string_view encoded_response, AdsResponseParserInterface* parser); - // Creates an initial LRS request. - std::string CreateLrsInitialRequest(); - - // Creates an LRS request sending a client-side load report. - std::string CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map); - - // Parses the LRS response and populates send_all_clusters, - // cluster_names, and load_reporting_interval. - absl::Status ParseLrsResponse(absl::string_view encoded_response, - bool* send_all_clusters, - std::set* cluster_names, - Duration* load_reporting_interval); - void PopulateNode(envoy_config_core_v3_Node* node_msg, upb_Arena* arena); private: diff --git a/src/core/xds/xds_client/xds_client.cc b/src/core/xds/xds_client/xds_client.cc index 667fe7083dd..b08166a7d17 100644 --- a/src/core/xds/xds_client/xds_client.cc +++ b/src/core/xds/xds_client/xds_client.cc @@ -350,100 +350,6 @@ class XdsClient::XdsChannel::AdsCall final std::map state_map_; }; -// Contains an LRS call to the xds server. -class XdsClient::XdsChannel::LrsCall final - : public InternallyRefCounted { - public: - // The ctor and dtor should not be used directly. - explicit LrsCall(RefCountedPtr> retryable_call); - - void Orphan() override; - - RetryableCall* retryable_call() { return retryable_call_.get(); } - XdsChannel* xds_channel() const { return retryable_call_->xds_channel(); } - XdsClient* xds_client() const { return xds_channel()->xds_client(); } - bool seen_response() const { return seen_response_; } - - private: - class StreamEventHandler final - : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler { - public: - explicit StreamEventHandler(RefCountedPtr lrs_call) - : lrs_call_(std::move(lrs_call)) {} - - void OnRequestSent(bool /*ok*/) override { lrs_call_->OnRequestSent(); } - void OnRecvMessage(absl::string_view payload) override { - lrs_call_->OnRecvMessage(payload); - } - void OnStatusReceived(absl::Status status) override { - lrs_call_->OnStatusReceived(std::move(status)); - } - - private: - RefCountedPtr lrs_call_; - }; - - // A repeating timer for a particular duration. - class Timer final : public InternallyRefCounted { - public: - explicit Timer(RefCountedPtr lrs_call) - : lrs_call_(std::move(lrs_call)) {} - ~Timer() override { lrs_call_.reset(DEBUG_LOCATION, "LRS timer"); } - - // Disable thread-safety analysis because this method is called via - // OrphanablePtr<>, but there's no way to pass the lock annotation - // through there. - void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS; - - void ScheduleNextReportLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - - private: - bool IsCurrentTimerOnCall() const { - return this == lrs_call_->timer_.get(); - } - XdsClient* xds_client() const { return lrs_call_->xds_client(); } - - void OnNextReportTimer(); - - // The owning LRS call. - RefCountedPtr lrs_call_; - - absl::optional timer_handle_ - ABSL_GUARDED_BY(&XdsClient::mu_); - }; - - void MaybeScheduleNextReportLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - - void SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - - void SendMessageLocked(std::string payload) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - - void OnRequestSent(); - void OnRecvMessage(absl::string_view payload); - void OnStatusReceived(absl::Status status); - - bool IsCurrentCallOnChannel() const; - - // The owning RetryableCall<>. - RefCountedPtr> retryable_call_; - - OrphanablePtr - streaming_call_; - - bool seen_response_ = false; - bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false; - - // Load reporting state. - bool send_all_clusters_ = false; - std::set cluster_names_; // Asked for by the LRS server. - Duration load_reporting_interval_; - bool last_report_counters_were_zero_ = false; - OrphanablePtr timer_; -}; - // // XdsClient::XdsChannel::ConnectivityFailureWatcher // @@ -513,7 +419,6 @@ void XdsClient::XdsChannel::Orphaned() ABSL_NO_THREAD_SAFETY_ANALYSIS { // it is shutting down. xds_client_->xds_channel_map_.erase(server_.Key()); ads_call_.reset(); - lrs_call_.reset(); } void XdsClient::XdsChannel::ResetBackoff() { transport_->ResetBackoff(); } @@ -522,21 +427,6 @@ XdsClient::XdsChannel::AdsCall* XdsClient::XdsChannel::ads_call() const { return ads_call_->call(); } -XdsClient::XdsChannel::LrsCall* XdsClient::XdsChannel::lrs_call() const { - return lrs_call_->call(); -} - -void XdsClient::XdsChannel::MaybeStartLrsCall() { - if (lrs_call_ != nullptr) return; - lrs_call_.reset( - new RetryableCall(WeakRef(DEBUG_LOCATION, "XdsChannel+lrs"))); -} - -void XdsClient::XdsChannel::StopLrsCallLocked() { - xds_client_->xds_load_report_server_map_.erase(server_.Key()); - lrs_call_.reset(); -} - void XdsClient::XdsChannel::SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name) { if (ads_call_ == nullptr) { @@ -1298,241 +1188,6 @@ XdsClient::XdsChannel::AdsCall::ResourceNamesForRequest( return resource_names; } -// -// XdsClient::XdsChannel::LrsCall::Timer -// - -void XdsClient::XdsChannel::LrsCall::Timer::Orphan() { - if (timer_handle_.has_value()) { - xds_client()->engine()->Cancel(*timer_handle_); - timer_handle_.reset(); - } - Unref(DEBUG_LOCATION, "Orphan"); -} - -void XdsClient::XdsChannel::LrsCall::Timer::ScheduleNextReportLocked() { - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << xds_client() << "] xds server " - << lrs_call_->xds_channel()->server_.server_uri() - << ": scheduling next load report in " - << lrs_call_->load_reporting_interval_; - timer_handle_ = xds_client()->engine()->RunAfter( - lrs_call_->load_reporting_interval_, - [self = Ref(DEBUG_LOCATION, "timer")]() { - ApplicationCallbackExecCtx callback_exec_ctx; - ExecCtx exec_ctx; - self->OnNextReportTimer(); - }); -} - -void XdsClient::XdsChannel::LrsCall::Timer::OnNextReportTimer() { - MutexLock lock(&xds_client()->mu_); - timer_handle_.reset(); - if (IsCurrentTimerOnCall()) lrs_call_->SendReportLocked(); -} - -// -// XdsClient::XdsChannel::LrsCall -// - -XdsClient::XdsChannel::LrsCall::LrsCall( - RefCountedPtr> retryable_call) - : InternallyRefCounted( - GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "LrsCall" : nullptr), - retryable_call_(std::move(retryable_call)) { - // Init the LRS call. Note that the call will progress every time there's - // activity in xds_client()->interested_parties_, which is comprised of - // the polling entities from client_channel. - CHECK_NE(xds_client(), nullptr); - const char* method = - "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats"; - streaming_call_ = xds_channel()->transport_->CreateStreamingCall( - method, std::make_unique( - // Passing the initial ref here. This ref will go away when - // the StreamEventHandler is destroyed. - RefCountedPtr(this))); - CHECK(streaming_call_ != nullptr); - // Start the call. - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << xds_client() << "] xds server " - << xds_channel()->server_.server_uri() - << ": starting LRS call (lrs_call=" << this - << ", streaming_call=" << streaming_call_.get() << ")"; - // Send the initial request. - std::string serialized_payload = xds_client()->api_.CreateLrsInitialRequest(); - SendMessageLocked(std::move(serialized_payload)); - // Read initial response. - streaming_call_->StartRecvMessage(); -} - -void XdsClient::XdsChannel::LrsCall::Orphan() { - timer_.reset(); - // Note that the initial ref is held by the StreamEventHandler, which - // will be destroyed when streaming_call_ is destroyed, which may not happen - // here, since there may be other refs held to streaming_call_ by internal - // callbacks. - streaming_call_.reset(); -} - -void XdsClient::XdsChannel::LrsCall::MaybeScheduleNextReportLocked() { - // If there are no more registered stats to report, cancel the call. - auto it = xds_client()->xds_load_report_server_map_.find( - xds_channel()->server_.Key()); - if (it == xds_client()->xds_load_report_server_map_.end() || - it->second.load_report_map.empty()) { - it->second.xds_channel->StopLrsCallLocked(); - return; - } - // Don't start if the previous send_message op hasn't completed yet. - // If this happens, we'll be called again from OnRequestSent(). - if (send_message_pending_) return; - // Don't start if no LRS response has arrived. - if (!seen_response()) return; - // If there is no timer, create one. - // This happens on the initial response and whenever the interval changes. - if (timer_ == nullptr) { - timer_ = MakeOrphanable(Ref(DEBUG_LOCATION, "LRS timer")); - } - // Schedule the next load report. - timer_->ScheduleNextReportLocked(); -} - -namespace { - -bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) { - for (const auto& p : snapshot) { - const XdsApi::ClusterLoadReport& cluster_snapshot = p.second; - if (!cluster_snapshot.dropped_requests.IsZero()) return false; - for (const auto& q : cluster_snapshot.locality_stats) { - const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second; - if (!locality_snapshot.IsZero()) return false; - } - } - return true; -} - -} // namespace - -void XdsClient::XdsChannel::LrsCall::SendReportLocked() { - // Construct snapshot from all reported stats. - XdsApi::ClusterLoadReportMap snapshot = - xds_client()->BuildLoadReportSnapshotLocked( - xds_channel()->server_, send_all_clusters_, cluster_names_); - // Skip client load report if the counters were all zero in the last - // report and they are still zero in this one. - const bool old_val = last_report_counters_were_zero_; - last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot); - if (old_val && last_report_counters_were_zero_) { - MaybeScheduleNextReportLocked(); - return; - } - // Send a request that contains the snapshot. - std::string serialized_payload = - xds_client()->api_.CreateLrsRequest(std::move(snapshot)); - SendMessageLocked(std::move(serialized_payload)); -} - -void XdsClient::XdsChannel::LrsCall::SendMessageLocked(std::string payload) { - send_message_pending_ = true; - streaming_call_->SendMessage(std::move(payload)); -} - -void XdsClient::XdsChannel::LrsCall::OnRequestSent() { - MutexLock lock(&xds_client()->mu_); - send_message_pending_ = false; - if (IsCurrentCallOnChannel()) MaybeScheduleNextReportLocked(); -} - -void XdsClient::XdsChannel::LrsCall::OnRecvMessage(absl::string_view payload) { - MutexLock lock(&xds_client()->mu_); - // If we're no longer the current call, ignore the result. - if (!IsCurrentCallOnChannel()) return; - // Start recv after any code branch - auto cleanup = absl::MakeCleanup( - [call = streaming_call_.get()]() { call->StartRecvMessage(); }); - // Parse the response. - bool send_all_clusters = false; - std::set new_cluster_names; - Duration new_load_reporting_interval; - absl::Status status = xds_client()->api_.ParseLrsResponse( - payload, &send_all_clusters, &new_cluster_names, - &new_load_reporting_interval); - if (!status.ok()) { - LOG(ERROR) << "[xds_client " << xds_client() << "] xds server " - << xds_channel()->server_.server_uri() - << ": LRS response parsing failed: " << status; - return; - } - seen_response_ = true; - if (GRPC_TRACE_FLAG_ENABLED(xds_client)) { - LOG(INFO) << "[xds_client " << xds_client() << "] xds server " - << xds_channel()->server_.server_uri() - << ": LRS response received, " << new_cluster_names.size() - << " cluster names, send_all_clusters=" << send_all_clusters - << ", load_report_interval=" - << new_load_reporting_interval.millis() << "ms"; - size_t i = 0; - for (const auto& name : new_cluster_names) { - LOG(INFO) << "[xds_client " << xds_client() << "] cluster_name " << i++ - << ": " << name; - } - } - if (new_load_reporting_interval < - Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) { - new_load_reporting_interval = - Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << xds_client() << "] xds server " - << xds_channel()->server_.server_uri() - << ": increased load_report_interval to minimum value " - << GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS << "ms"; - } - // Ignore identical update. - if (send_all_clusters == send_all_clusters_ && - cluster_names_ == new_cluster_names && - load_reporting_interval_ == new_load_reporting_interval) { - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << xds_client() << "] xds server " - << xds_channel()->server_.server_uri() - << ": incoming LRS response identical to current, ignoring."; - return; - } - // If the interval has changed, we'll need to restart the timer below. - const bool restart_timer = - load_reporting_interval_ != new_load_reporting_interval; - // Record the new config. - send_all_clusters_ = send_all_clusters; - cluster_names_ = std::move(new_cluster_names); - load_reporting_interval_ = new_load_reporting_interval; - // Restart timer if needed. - if (restart_timer) { - timer_.reset(); - MaybeScheduleNextReportLocked(); - } -} - -void XdsClient::XdsChannel::LrsCall::OnStatusReceived(absl::Status status) { - MutexLock lock(&xds_client()->mu_); - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << xds_client() << "] xds server " - << xds_channel()->server_.server_uri() - << ": LRS call status received (xds_channel=" << xds_channel() - << ", lrs_call=" << this << ", streaming_call=" << streaming_call_.get() - << "): " << status; - // Ignore status from a stale call. - if (IsCurrentCallOnChannel()) { - // Try to restart the call. - retryable_call_->OnCallFinishedLocked(); - } -} - -bool XdsClient::XdsChannel::LrsCall::IsCurrentCallOnChannel() const { - // If the retryable LRS call is null (which only happens when the xds - // channel is shutting down), all the LRS calls are stale. - if (xds_channel()->lrs_call_ == nullptr) return false; - return this == xds_channel()->lrs_call_->call(); -} - // // XdsClient // @@ -1580,13 +1235,6 @@ void XdsClient::Orphaned() { // Clear cache and any remaining watchers that may not have been cancelled. authority_state_map_.clear(); invalid_watchers_.clear(); - // We may still be sending lingering queued load report data, so don't - // just clear the load reporting map, but we do want to clear the refs - // we're holding to the XdsChannel objects, to make sure that - // everything shuts down properly. - for (auto& p : xds_load_report_server_map_) { - p.second.xds_channel.reset(DEBUG_LOCATION, "XdsClient::Orphan()"); - } } RefCountedPtr XdsClient::GetOrCreateXdsChannelLocked( @@ -1860,140 +1508,6 @@ std::string XdsClient::ConstructFullXdsResourceName( return key.id; } -RefCountedPtr XdsClient::AddClusterDropStats( - const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, - absl::string_view eds_service_name) { - auto key = - std::make_pair(std::string(cluster_name), std::string(eds_service_name)); - RefCountedPtr cluster_drop_stats; - { - MutexLock lock(&mu_); - // We jump through some hoops here to make sure that the - // absl::string_views stored in the XdsClusterDropStats object point - // to the strings in the xds_load_report_server_map_ keys, so that - // they have the same lifetime. - auto server_it = xds_load_report_server_map_ - .emplace(xds_server.Key(), LoadReportServer()) - .first; - if (server_it->second.xds_channel == nullptr) { - server_it->second.xds_channel = GetOrCreateXdsChannelLocked( - xds_server, "load report map (drop stats)"); - } - auto load_report_it = server_it->second.load_report_map - .emplace(std::move(key), LoadReportState()) - .first; - LoadReportState& load_report_state = load_report_it->second; - if (load_report_state.drop_stats != nullptr) { - cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero(); - } - if (cluster_drop_stats == nullptr) { - if (load_report_state.drop_stats != nullptr) { - load_report_state.deleted_drop_stats += - load_report_state.drop_stats->GetSnapshotAndReset(); - } - cluster_drop_stats = MakeRefCounted( - Ref(DEBUG_LOCATION, "DropStats"), server_it->first /*xds_server*/, - load_report_it->first.first /*cluster_name*/, - load_report_it->first.second /*eds_service_name*/); - load_report_state.drop_stats = cluster_drop_stats.get(); - } - server_it->second.xds_channel->MaybeStartLrsCall(); - } - work_serializer_.DrainQueue(); - return cluster_drop_stats; -} - -void XdsClient::RemoveClusterDropStats( - absl::string_view xds_server_key, absl::string_view cluster_name, - absl::string_view eds_service_name, - XdsClusterDropStats* cluster_drop_stats) { - MutexLock lock(&mu_); - auto server_it = xds_load_report_server_map_.find(xds_server_key); - if (server_it == xds_load_report_server_map_.end()) return; - auto load_report_it = server_it->second.load_report_map.find( - std::make_pair(std::string(cluster_name), std::string(eds_service_name))); - if (load_report_it == server_it->second.load_report_map.end()) return; - LoadReportState& load_report_state = load_report_it->second; - if (load_report_state.drop_stats == cluster_drop_stats) { - // Record final snapshot in deleted_drop_stats, which will be - // added to the next load report. - load_report_state.deleted_drop_stats += - load_report_state.drop_stats->GetSnapshotAndReset(); - load_report_state.drop_stats = nullptr; - } -} - -RefCountedPtr XdsClient::AddClusterLocalityStats( - const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, - absl::string_view eds_service_name, - RefCountedPtr locality) { - auto key = - std::make_pair(std::string(cluster_name), std::string(eds_service_name)); - RefCountedPtr cluster_locality_stats; - { - MutexLock lock(&mu_); - // We jump through some hoops here to make sure that the - // absl::string_views stored in the XdsClusterDropStats object point - // to the strings in the xds_load_report_server_map_ keys, so that - // they have the same lifetime. - auto server_it = xds_load_report_server_map_ - .emplace(xds_server.Key(), LoadReportServer()) - .first; - if (server_it->second.xds_channel == nullptr) { - server_it->second.xds_channel = GetOrCreateXdsChannelLocked( - xds_server, "load report map (locality stats)"); - } - auto load_report_it = server_it->second.load_report_map - .emplace(std::move(key), LoadReportState()) - .first; - LoadReportState& load_report_state = load_report_it->second; - LoadReportState::LocalityState& locality_state = - load_report_state.locality_stats[locality]; - if (locality_state.locality_stats != nullptr) { - cluster_locality_stats = locality_state.locality_stats->RefIfNonZero(); - } - if (cluster_locality_stats == nullptr) { - if (locality_state.locality_stats != nullptr) { - locality_state.deleted_locality_stats += - locality_state.locality_stats->GetSnapshotAndReset(); - } - cluster_locality_stats = MakeRefCounted( - Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first /*xds_server*/, - load_report_it->first.first /*cluster_name*/, - load_report_it->first.second /*eds_service_name*/, - std::move(locality)); - locality_state.locality_stats = cluster_locality_stats.get(); - } - server_it->second.xds_channel->MaybeStartLrsCall(); - } - work_serializer_.DrainQueue(); - return cluster_locality_stats; -} - -void XdsClient::RemoveClusterLocalityStats( - absl::string_view xds_server_key, absl::string_view cluster_name, - absl::string_view eds_service_name, - const RefCountedPtr& locality, - XdsClusterLocalityStats* cluster_locality_stats) { - MutexLock lock(&mu_); - auto server_it = xds_load_report_server_map_.find(xds_server_key); - if (server_it == xds_load_report_server_map_.end()) return; - auto load_report_it = server_it->second.load_report_map.find( - std::make_pair(std::string(cluster_name), std::string(eds_service_name))); - if (load_report_it == server_it->second.load_report_map.end()) return; - LoadReportState& load_report_state = load_report_it->second; - auto locality_it = load_report_state.locality_stats.find(locality); - if (locality_it == load_report_state.locality_stats.end()) return; - LoadReportState::LocalityState& locality_state = locality_it->second; - if (locality_state.locality_stats == cluster_locality_stats) { - // Record final snapshot in deleted_locality_stats, which will be - // added to the next load report. - locality_state.deleted_locality_stats += - locality_state.locality_stats->GetSnapshotAndReset(); - locality_state.locality_stats = nullptr; - } -} - void XdsClient::ResetBackoff() { MutexLock lock(&mu_); for (auto& p : xds_channel_map_) { @@ -2036,86 +1550,6 @@ void XdsClient::NotifyWatchersOnResourceDoesNotExist( DEBUG_LOCATION); } -XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( - const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters, - const std::set& clusters) { - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << this << "] start building load report"; - XdsApi::ClusterLoadReportMap snapshot_map; - auto server_it = xds_load_report_server_map_.find(xds_server.Key()); - if (server_it == xds_load_report_server_map_.end()) return snapshot_map; - auto& load_report_map = server_it->second.load_report_map; - for (auto load_report_it = load_report_map.begin(); - load_report_it != load_report_map.end();) { - // Cluster key is cluster and EDS service name. - const auto& cluster_key = load_report_it->first; - LoadReportState& load_report = load_report_it->second; - // If the CDS response for a cluster indicates to use LRS but the - // LRS server does not say that it wants reports for this cluster, - // then we'll have stats objects here whose data we're not going to - // include in the load report. However, we still need to clear out - // the data from the stats objects, so that if the LRS server starts - // asking for the data in the future, we don't incorrectly include - // data from previous reporting intervals in that future report. - const bool record_stats = - send_all_clusters || clusters.find(cluster_key.first) != clusters.end(); - XdsApi::ClusterLoadReport snapshot; - // Aggregate drop stats. - snapshot.dropped_requests = std::move(load_report.deleted_drop_stats); - if (load_report.drop_stats != nullptr) { - snapshot.dropped_requests += - load_report.drop_stats->GetSnapshotAndReset(); - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << this << "] cluster=" << cluster_key.first - << " eds_service_name=" << cluster_key.second - << " drop_stats=" << load_report.drop_stats; - } - // Aggregate locality stats. - for (auto it = load_report.locality_stats.begin(); - it != load_report.locality_stats.end();) { - const RefCountedPtr& locality_name = it->first; - auto& locality_state = it->second; - XdsClusterLocalityStats::Snapshot& locality_snapshot = - snapshot.locality_stats[locality_name]; - locality_snapshot = std::move(locality_state.deleted_locality_stats); - if (locality_state.locality_stats != nullptr) { - locality_snapshot += - locality_state.locality_stats->GetSnapshotAndReset(); - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << this - << "] cluster=" << cluster_key.first.c_str() - << " eds_service_name=" << cluster_key.second.c_str() - << " locality=" << locality_name->human_readable_string().c_str() - << " locality_stats=" << locality_state.locality_stats; - } - // If the only thing left in this entry was final snapshots from - // deleted locality stats objects, remove the entry. - if (locality_state.locality_stats == nullptr) { - it = load_report.locality_stats.erase(it); - } else { - ++it; - } - } - // Compute load report interval. - const Timestamp now = Timestamp::Now(); - snapshot.load_report_interval = now - load_report.last_report_time; - load_report.last_report_time = now; - // Record snapshot. - if (record_stats) { - snapshot_map[cluster_key] = std::move(snapshot); - } - // If the only thing left in this entry was final snapshots from - // deleted stats objects, remove the entry. - if (load_report.locality_stats.empty() && - load_report.drop_stats == nullptr) { - load_report_it = load_report_map.erase(load_report_it); - } else { - ++load_report_it; - } - } - return snapshot_map; -} - namespace { google_protobuf_Timestamp* EncodeTimestamp(Timestamp value, upb_Arena* arena) { diff --git a/src/core/xds/xds_client/xds_client.h b/src/core/xds/xds_client/xds_client.h index 50d5c11c0a2..7bd3cad12c6 100644 --- a/src/core/xds/xds_client/xds_client.h +++ b/src/core/xds/xds_client/xds_client.h @@ -92,14 +92,6 @@ class XdsClient : public DualRefCounted { Duration resource_request_timeout = Duration::Seconds(15)); ~XdsClient() override; - const XdsBootstrap& bootstrap() const { - return *bootstrap_; // ctor asserts that it is non-null - } - - XdsTransportFactory* transport_factory() const { - return transport_factory_.get(); - } - // Start and cancel watch for a resource. // // The XdsClient takes ownership of the watcher, but the caller may @@ -126,30 +118,17 @@ class XdsClient : public DualRefCounted { ResourceWatcherInterface* watcher, bool delay_unsubscription = false); - // Adds and removes drop stats for cluster_name and eds_service_name. - RefCountedPtr AddClusterDropStats( - const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, - absl::string_view eds_service_name); - void RemoveClusterDropStats(absl::string_view xds_server, - absl::string_view cluster_name, - absl::string_view eds_service_name, - XdsClusterDropStats* cluster_drop_stats); - - // Adds and removes locality stats for cluster_name and eds_service_name - // for the specified locality. - RefCountedPtr AddClusterLocalityStats( - const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, - absl::string_view eds_service_name, - RefCountedPtr locality); - void RemoveClusterLocalityStats( - absl::string_view xds_server, absl::string_view cluster_name, - absl::string_view eds_service_name, - const RefCountedPtr& locality, - XdsClusterLocalityStats* cluster_locality_stats); - // Resets connection backoff state. virtual void ResetBackoff(); + const XdsBootstrap& bootstrap() const { + return *bootstrap_; // ctor asserts that it is non-null + } + + XdsTransportFactory* transport_factory() const { + return transport_factory_.get(); + } + grpc_event_engine::experimental::EventEngine* engine() { return engine_.get(); } @@ -212,7 +191,6 @@ class XdsClient : public DualRefCounted { class RetryableCall; class AdsCall; - class LrsCall; XdsChannel(WeakRefCountedPtr xds_client, const XdsBootstrap::XdsServer& server); @@ -220,13 +198,9 @@ class XdsClient : public DualRefCounted { XdsClient* xds_client() const { return xds_client_.get(); } AdsCall* ads_call() const; - LrsCall* lrs_call() const; void ResetBackoff(); - void MaybeStartLrsCall(); - void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - // Returns non-OK if there has been an error since the last time the // ADS stream saw a response. const absl::Status& status() const { return status_; } @@ -272,7 +246,6 @@ class XdsClient : public DualRefCounted { // The retryable ADS and LRS calls. OrphanablePtr> ads_call_; - OrphanablePtr> lrs_call_; // Stores the most recent accepted resource version for each resource type. std::map @@ -296,30 +269,6 @@ class XdsClient : public DualRefCounted { resource_map; }; - struct LoadReportState { - struct LocalityState { - XdsClusterLocalityStats* locality_stats = nullptr; - XdsClusterLocalityStats::Snapshot deleted_locality_stats; - }; - - XdsClusterDropStats* drop_stats = nullptr; - XdsClusterDropStats::Snapshot deleted_drop_stats; - std::map, LocalityState, - XdsLocalityName::Less> - locality_stats; - Timestamp last_report_time = Timestamp::Now(); - }; - - // Load report data. - using LoadReportMap = std::map< - std::pair, - LoadReportState>; - - struct LoadReportServer { - RefCountedPtr xds_channel; - LoadReportMap load_report_map; - }; - // Sends an error notification to a specific set of watchers. void NotifyWatchersOnErrorLocked( const std::map { const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + bool HasUncachedResources(const AuthorityState& authority_state); + absl::StatusOr ParseXdsResourceName( absl::string_view name, const XdsResourceType* type); static std::string ConstructFullXdsResourceName( absl::string_view authority, absl::string_view resource_type, const XdsResourceKey& key); - XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( - const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters, - const std::set& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); RefCountedPtr GetOrCreateXdsChannelLocked( const XdsBootstrap::XdsServer& server, const char* reason) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - bool HasUncachedResources(const AuthorityState& authority_state); std::shared_ptr bootstrap_; RefCountedPtr transport_factory_; @@ -375,9 +322,6 @@ class XdsClient : public DualRefCounted { std::map authority_state_map_ ABSL_GUARDED_BY(mu_); - std::map> - xds_load_report_server_map_ ABSL_GUARDED_BY(mu_); - // Stores started watchers whose resource name was not parsed successfully, // waiting to be cancelled or reset in Orphan(). std::map> diff --git a/src/core/xds/xds_client/xds_client_stats.cc b/src/core/xds/xds_client/xds_client_stats.cc deleted file mode 100644 index 3219b224a9e..00000000000 --- a/src/core/xds/xds_client/xds_client_stats.cc +++ /dev/null @@ -1,164 +0,0 @@ -// -// -// Copyright 2018 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// - -#include "src/core/xds/xds_client/xds_client_stats.h" - -#include "absl/log/log.h" - -#include - -#include "src/core/lib/debug/trace.h" -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/xds/xds_client/xds_client.h" - -namespace grpc_core { - -namespace { - -uint64_t GetAndResetCounter(std::atomic* from) { - return from->exchange(0, std::memory_order_relaxed); -} - -} // namespace - -// -// XdsClusterDropStats -// - -XdsClusterDropStats::XdsClusterDropStats(RefCountedPtr xds_client, - absl::string_view lrs_server, - absl::string_view cluster_name, - absl::string_view eds_service_name) - : RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) - ? "XdsClusterDropStats" - : nullptr), - xds_client_(std::move(xds_client)), - lrs_server_(lrs_server), - cluster_name_(cluster_name), - eds_service_name_(eds_service_name) { - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << xds_client_.get() << "] created drop stats " << this - << " for {" << lrs_server_ << ", " << cluster_name_ << ", " - << eds_service_name_ << "}"; -} - -XdsClusterDropStats::~XdsClusterDropStats() { - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << xds_client_.get() << "] destroying drop stats " - << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", " - << eds_service_name_ << "}"; - xds_client_->RemoveClusterDropStats(lrs_server_, cluster_name_, - eds_service_name_, this); - xds_client_.reset(DEBUG_LOCATION, "DropStats"); -} - -XdsClusterDropStats::Snapshot XdsClusterDropStats::GetSnapshotAndReset() { - Snapshot snapshot; - snapshot.uncategorized_drops = GetAndResetCounter(&uncategorized_drops_); - MutexLock lock(&mu_); - snapshot.categorized_drops = std::move(categorized_drops_); - return snapshot; -} - -void XdsClusterDropStats::AddUncategorizedDrops() { - uncategorized_drops_.fetch_add(1); -} - -void XdsClusterDropStats::AddCallDropped(const std::string& category) { - MutexLock lock(&mu_); - ++categorized_drops_[category]; -} - -// -// XdsClusterLocalityStats -// - -XdsClusterLocalityStats::XdsClusterLocalityStats( - RefCountedPtr xds_client, absl::string_view lrs_server, - absl::string_view cluster_name, absl::string_view eds_service_name, - RefCountedPtr name) - : RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) - ? "XdsClusterLocalityStats" - : nullptr), - xds_client_(std::move(xds_client)), - lrs_server_(lrs_server), - cluster_name_(cluster_name), - eds_service_name_(eds_service_name), - name_(std::move(name)) { - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << xds_client_.get() << "] created locality stats " - << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", " - << eds_service_name_ << ", " - << (name_ == nullptr ? "" : name_->human_readable_string().c_str()) - << "}"; -} - -XdsClusterLocalityStats::~XdsClusterLocalityStats() { - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << xds_client_.get() << "] destroying locality stats " - << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", " - << eds_service_name_ << ", " - << (name_ == nullptr ? "" : name_->human_readable_string().c_str()) - << "}"; - xds_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_, - eds_service_name_, name_, this); - xds_client_.reset(DEBUG_LOCATION, "LocalityStats"); -} - -XdsClusterLocalityStats::Snapshot -XdsClusterLocalityStats::GetSnapshotAndReset() { - Snapshot snapshot; - for (auto& percpu_stats : stats_) { - Snapshot percpu_snapshot = { - GetAndResetCounter(&percpu_stats.total_successful_requests), - // Don't reset total_requests_in_progress because it's - // not related to a single reporting interval. - percpu_stats.total_requests_in_progress.load(std::memory_order_relaxed), - GetAndResetCounter(&percpu_stats.total_error_requests), - GetAndResetCounter(&percpu_stats.total_issued_requests), - {}}; - { - MutexLock lock(&percpu_stats.backend_metrics_mu); - percpu_snapshot.backend_metrics = std::move(percpu_stats.backend_metrics); - } - snapshot += percpu_snapshot; - } - return snapshot; -} - -void XdsClusterLocalityStats::AddCallStarted() { - Stats& stats = stats_.this_cpu(); - stats.total_issued_requests.fetch_add(1, std::memory_order_relaxed); - stats.total_requests_in_progress.fetch_add(1, std::memory_order_relaxed); -} - -void XdsClusterLocalityStats::AddCallFinished( - const std::map* named_metrics, bool fail) { - Stats& stats = stats_.this_cpu(); - std::atomic& to_increment = - fail ? stats.total_error_requests : stats.total_successful_requests; - to_increment.fetch_add(1, std::memory_order_relaxed); - stats.total_requests_in_progress.fetch_add(-1, std::memory_order_acq_rel); - if (named_metrics == nullptr) return; - MutexLock lock(&stats.backend_metrics_mu); - for (const auto& m : *named_metrics) { - stats.backend_metrics[std::string(m.first)] += BackendMetric{1, m.second}; - } -} - -} // namespace grpc_core diff --git a/src/core/xds/xds_client/xds_client_stats.h b/src/core/xds/xds_client/xds_client_stats.h index 291a9e25718..6503916198d 100644 --- a/src/core/xds/xds_client/xds_client_stats.h +++ b/src/core/xds/xds_client/xds_client_stats.h @@ -1,3 +1,6 @@ +// FIXME: rename to xds_locality.h + + // // // Copyright 2018 gRPC authors. @@ -19,33 +22,21 @@ #ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H #define GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H -#include -#include -#include #include #include -#include "absl/base/thread_annotations.h" #include "absl/strings/str_format.h" #include "absl/strings/string_view.h" -#include - -#include "src/core/lib/gprpp/per_cpu.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/ref_counted_string.h" #include "src/core/resolver/endpoint_addresses.h" -#include "src/core/telemetry/call_tracer.h" #include "src/core/util/useful.h" -#include "src/core/xds/xds_client/xds_bootstrap.h" namespace grpc_core { -// Forward declaration to avoid circular dependency. -class XdsClient; - -// Locality name. +// An xDS locality name. class XdsLocalityName final : public RefCounted { public: struct Less { @@ -110,149 +101,6 @@ class XdsLocalityName final : public RefCounted { RefCountedStringValue human_readable_string_; }; -// Drop stats for an xds cluster. -class XdsClusterDropStats final : public RefCounted { - public: - // The total number of requests dropped for any reason is the sum of - // uncategorized_drops, and dropped_requests map. - using CategorizedDropsMap = std::map; - struct Snapshot { - uint64_t uncategorized_drops = 0; - // The number of requests dropped for the specific drop categories - // outlined in the drop_overloads field in the EDS response. - CategorizedDropsMap categorized_drops; - - Snapshot& operator+=(const Snapshot& other) { - uncategorized_drops += other.uncategorized_drops; - for (const auto& p : other.categorized_drops) { - categorized_drops[p.first] += p.second; - } - return *this; - } - - bool IsZero() const { - if (uncategorized_drops != 0) return false; - for (const auto& p : categorized_drops) { - if (p.second != 0) return false; - } - return true; - } - }; - - XdsClusterDropStats(RefCountedPtr xds_client, - absl::string_view lrs_server, - absl::string_view cluster_name, - absl::string_view eds_service_name); - ~XdsClusterDropStats() override; - - // Returns a snapshot of this instance and resets all the counters. - Snapshot GetSnapshotAndReset(); - - void AddUncategorizedDrops(); - void AddCallDropped(const std::string& category); - - private: - RefCountedPtr xds_client_; - absl::string_view lrs_server_; - absl::string_view cluster_name_; - absl::string_view eds_service_name_; - std::atomic uncategorized_drops_{0}; - // Protects categorized_drops_. A mutex is necessary because the length of - // dropped_requests can be accessed by both the picker (from data plane - // mutex) and the load reporting thread (from the control plane combiner). - Mutex mu_; - CategorizedDropsMap categorized_drops_ ABSL_GUARDED_BY(mu_); -}; - -// Locality stats for an xds cluster. -class XdsClusterLocalityStats final - : public RefCounted { - public: - struct BackendMetric { - uint64_t num_requests_finished_with_metric = 0; - double total_metric_value = 0; - - BackendMetric& operator+=(const BackendMetric& other) { - num_requests_finished_with_metric += - other.num_requests_finished_with_metric; - total_metric_value += other.total_metric_value; - return *this; - } - - bool IsZero() const { - return num_requests_finished_with_metric == 0 && total_metric_value == 0; - } - }; - - struct Snapshot { - uint64_t total_successful_requests = 0; - uint64_t total_requests_in_progress = 0; - uint64_t total_error_requests = 0; - uint64_t total_issued_requests = 0; - std::map backend_metrics; - - Snapshot& operator+=(const Snapshot& other) { - total_successful_requests += other.total_successful_requests; - total_requests_in_progress += other.total_requests_in_progress; - total_error_requests += other.total_error_requests; - total_issued_requests += other.total_issued_requests; - for (const auto& p : other.backend_metrics) { - backend_metrics[p.first] += p.second; - } - return *this; - } - - bool IsZero() const { - if (total_successful_requests != 0 || total_requests_in_progress != 0 || - total_error_requests != 0 || total_issued_requests != 0) { - return false; - } - for (const auto& p : backend_metrics) { - if (!p.second.IsZero()) return false; - } - return true; - } - }; - - XdsClusterLocalityStats(RefCountedPtr xds_client, - absl::string_view lrs_server, - absl::string_view cluster_name, - absl::string_view eds_service_name, - RefCountedPtr name); - ~XdsClusterLocalityStats() override; - - // Returns a snapshot of this instance and resets all the counters. - Snapshot GetSnapshotAndReset(); - - void AddCallStarted(); - void AddCallFinished(const std::map* named_metrics, - bool fail = false); - - XdsLocalityName* locality_name() const { return name_.get(); } - - private: - struct Stats { - std::atomic total_successful_requests{0}; - std::atomic total_requests_in_progress{0}; - std::atomic total_error_requests{0}; - std::atomic total_issued_requests{0}; - - // Protects backend_metrics. A mutex is necessary because the length of - // backend_metrics_ can be accessed by both the callback intercepting the - // call's recv_trailing_metadata and the load reporting thread. - Mutex backend_metrics_mu; - std::map backend_metrics - ABSL_GUARDED_BY(backend_metrics_mu); - }; - - RefCountedPtr xds_client_; - absl::string_view lrs_server_; - absl::string_view cluster_name_; - absl::string_view eds_service_name_; - RefCountedPtr name_; - PerCpu stats_{PerCpuOptions().SetMaxShards(32).SetCpusPerShard(4)}; -}; - } // namespace grpc_core #endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H