From 936f4a21f715551859f981b70e0db3e6950ccf1d Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 8 Jul 2022 12:50:11 -0700 Subject: [PATCH] XdsClient: convert timers to use EE API (#30189) * XdsClient: convert timers to use EE API * fix build * Automated change: Fix sanity tests * Automated change: Fix sanity tests * code review comments * fix build Co-authored-by: markdroth --- BUILD | 1 + src/core/ext/xds/xds_client.cc | 221 +++++++++++++++------------------ 2 files changed, 102 insertions(+), 120 deletions(-) diff --git a/BUILD b/BUILD index bef3065958c..298dc127576 100644 --- a/BUILD +++ b/BUILD @@ -3817,6 +3817,7 @@ grpc_cc_library( "channel_fwd", "config", "debug_location", + "default_event_engine_factory_hdrs", "envoy_admin_upb", "envoy_config_cluster_upb", "envoy_config_cluster_upbdefs", diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 5628f64b39c..a03f3cff240 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -31,7 +31,9 @@ #include "absl/strings/str_split.h" #include "absl/strings/string_view.h" #include "absl/strings/strip.h" +#include "absl/types/optional.h" +#include #include #include @@ -40,13 +42,12 @@ #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/event_engine/event_engine_factory.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/uri/uri_parser.h" #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1 @@ -57,6 +58,9 @@ namespace grpc_core { +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; + TraceFlag grpc_xds_client_trace(false, "xds_client"); TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount"); @@ -72,9 +76,12 @@ class XdsClient::ChannelState::RetryableCall public: explicit RetryableCall(WeakRefCountedPtr chand); - void Orphan() override; + // Disable thread-safety analysis because this method is called via + // OrphanablePtr<>, but there's no way to pass the lock annotation + // through there. + void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS; - void OnCallFinishedLocked(); + void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); T* calld() const { return calld_.get(); } ChannelState* chand() const { return chand_.get(); } @@ -83,9 +90,9 @@ class XdsClient::ChannelState::RetryableCall private: void StartNewCallLocked(); - void StartRetryTimerLocked(); - static void OnRetryTimer(void* arg, grpc_error_handle error); - void OnRetryTimerLocked(grpc_error_handle error); + void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + + void OnRetryTimer(); // The wrapped xds call that talks to the xds server. It's instantiated // every time we start a new call. It's null during call retry backoff. @@ -95,9 +102,8 @@ class XdsClient::ChannelState::RetryableCall // Retry state. BackOff backoff_; - grpc_timer retry_timer_; - grpc_closure on_retry_timer_; - bool retry_timer_callback_pending_ = false; + absl::optional timer_handle_ + ABSL_GUARDED_BY(&XdsClient::mu_); bool shutting_down_ = false; }; @@ -163,12 +169,12 @@ class XdsClient::ChannelState::AdsCallState class ResourceTimer : public InternallyRefCounted { public: ResourceTimer(const XdsResourceType* type, const XdsResourceName& name) - : type_(type), name_(name) { - GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this, - grpc_schedule_on_exec_ctx); - } + : type_(type), name_(name) {} - void Orphan() override { + // Disable thread-safety analysis because this method is called via + // OrphanablePtr<>, but there's no way to pass the lock annotation + // through there. + void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS { MaybeCancelTimer(); Unref(DEBUG_LOCATION, "Orphan"); } @@ -188,15 +194,16 @@ class XdsClient::ChannelState::AdsCallState if (state.resource != nullptr) return; // Start timer. ads_calld_ = std::move(ads_calld); - Ref(DEBUG_LOCATION, "timer").release(); - timer_pending_ = true; - grpc_timer_init( - &timer_, - ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_, - &timer_callback_); + timer_handle_ = GetDefaultEventEngine()->RunAfter( + ads_calld_->xds_client()->request_timeout_, + [self = Ref(DEBUG_LOCATION, "timer")]() { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + self->OnTimer(); + }); } - void MaybeCancelTimer() { + void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { // If the timer hasn't been started yet, make sure we don't start // it later. This can happen if the last watch for an LDS or CDS // resource is cancelled and then restarted, both while an ADS @@ -209,57 +216,48 @@ class XdsClient::ChannelState::AdsCallState // For details, see https://github.com/grpc/grpc/issues/29583. // TODO(roth): Find a way to write a test for this case. timer_start_needed_ = false; - if (timer_pending_) { - grpc_timer_cancel(&timer_); - timer_pending_ = false; + if (timer_handle_.has_value()) { + GetDefaultEventEngine()->Cancel(*timer_handle_); + timer_handle_.reset(); } } private: - static void OnTimer(void* arg, grpc_error_handle error) { - ResourceTimer* self = static_cast(arg); + void OnTimer() { { - MutexLock lock(&self->ads_calld_->xds_client()->mu_); - self->OnTimerLocked(GRPC_ERROR_REF(error)); - } - self->ads_calld_->xds_client()->work_serializer_.DrainQueue(); - self->ads_calld_.reset(); - self->Unref(DEBUG_LOCATION, "timer"); - } - - void OnTimerLocked(grpc_error_handle error) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { - if (GRPC_ERROR_IS_NONE(error) && timer_pending_) { - timer_pending_ = false; - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] xds server %s: timeout obtaining resource " - "{type=%s name=%s} from xds server", - ads_calld_->xds_client(), - ads_calld_->chand()->server_.server_uri.c_str(), - std::string(type_->type_url()).c_str(), - XdsClient::ConstructFullXdsResourceName( - name_.authority, type_->type_url(), name_.key) - .c_str()); + MutexLock lock(&ads_calld_->xds_client()->mu_); + if (timer_handle_.has_value()) { + timer_handle_.reset(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] xds server %s: timeout obtaining resource " + "{type=%s name=%s} from xds server", + ads_calld_->xds_client(), + ads_calld_->chand()->server_.server_uri.c_str(), + std::string(type_->type_url()).c_str(), + XdsClient::ConstructFullXdsResourceName( + name_.authority, type_->type_url(), name_.key) + .c_str()); + } + auto& authority_state = + ads_calld_->xds_client()->authority_state_map_[name_.authority]; + ResourceState& state = authority_state.resource_map[type_][name_.key]; + state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; + ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist( + state.watchers); } - auto& authority_state = - ads_calld_->xds_client()->authority_state_map_[name_.authority]; - ResourceState& state = authority_state.resource_map[type_][name_.key]; - state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist( - state.watchers); } - GRPC_ERROR_UNREF(error); + ads_calld_->xds_client()->work_serializer_.DrainQueue(); + ads_calld_.reset(); } const XdsResourceType* type_; const XdsResourceName name_; RefCountedPtr ads_calld_; - bool timer_start_needed_ = true; - bool timer_pending_ = false; - grpc_timer timer_; - grpc_closure timer_callback_; + bool timer_start_needed_ ABSL_GUARDED_BY(&XdsClient::mu_) = true; + absl::optional timer_handle_ + ABSL_GUARDED_BY(&XdsClient::mu_); }; class StreamEventHandler @@ -364,21 +362,20 @@ class XdsClient::ChannelState::LrsCallState public: Reporter(RefCountedPtr parent, Duration report_interval) : parent_(std::move(parent)), report_interval_(report_interval) { - GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this, - grpc_schedule_on_exec_ctx); ScheduleNextReportLocked(); } - void Orphan() override; + // Disable thread-safety analysis because this method is called via + // OrphanablePtr<>, but there's no way to pass the lock annotation + // through there. + void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS; void OnReportDoneLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); private: void ScheduleNextReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - static void OnNextReportTimer(void* arg, grpc_error_handle error); - bool OnNextReportTimerLocked(grpc_error_handle error) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + bool OnNextReportTimer(); bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool IsCurrentReporterOnCall() const { @@ -392,9 +389,8 @@ class XdsClient::ChannelState::LrsCallState // The load reporting state. const Duration report_interval_; bool last_report_counters_were_zero_ = false; - bool next_report_timer_callback_pending_ = false; - grpc_timer next_report_timer_; - grpc_closure on_next_report_timer_; + absl::optional timer_handle_ + ABSL_GUARDED_BY(&XdsClient::mu_); }; void OnRequestSent(bool ok); @@ -566,9 +562,6 @@ XdsClient::ChannelState::RetryableCall::RetryableCall( .set_jitter(GRPC_XDS_RECONNECT_JITTER) .set_max_backoff(Duration::Seconds( GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) { - // Closure Initialization - GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, - grpc_schedule_on_exec_ctx); StartNewCallLocked(); } @@ -576,7 +569,10 @@ template void XdsClient::ChannelState::RetryableCall::Orphan() { shutting_down_ = true; calld_.reset(); - if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_); + if (timer_handle_.has_value()) { + GetDefaultEventEngine()->Cancel(*timer_handle_); + timer_handle_.reset(); + } this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned"); } @@ -608,36 +604,30 @@ template void XdsClient::ChannelState::RetryableCall::StartRetryTimerLocked() { if (shutting_down_) return; const Timestamp next_attempt_time = backoff_.NextAttemptTime(); + const Duration timeout = + std::max(next_attempt_time - ExecCtx::Get()->Now(), Duration::Zero()); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - Duration timeout = - std::max(next_attempt_time - ExecCtx::Get()->Now(), Duration::Zero()); gpr_log(GPR_INFO, "[xds_client %p] xds server %s: call attempt failed; " "retry timer will fire in %" PRId64 "ms.", chand()->xds_client(), chand()->server_.server_uri.c_str(), timeout.millis()); } - this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release(); - grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_); - retry_timer_callback_pending_ = true; + timer_handle_ = GetDefaultEventEngine()->RunAfter( + timeout, + [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + self->OnRetryTimer(); + }); } template -void XdsClient::ChannelState::RetryableCall::OnRetryTimer( - void* arg, grpc_error_handle error) { - RetryableCall* calld = static_cast(arg); - { - MutexLock lock(&calld->chand_->xds_client()->mu_); - calld->OnRetryTimerLocked(GRPC_ERROR_REF(error)); - } - calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done"); -} - -template -void XdsClient::ChannelState::RetryableCall::OnRetryTimerLocked( - grpc_error_handle error) { - retry_timer_callback_pending_ = false; - if (!shutting_down_ && GRPC_ERROR_IS_NONE(error)) { +void XdsClient::ChannelState::RetryableCall::OnRetryTimer() { + MutexLock lock(&chand_->xds_client()->mu_); + if (timer_handle_.has_value()) { + timer_handle_.reset(); + if (shutting_down_) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: retry timer fired (retryable " @@ -646,7 +636,6 @@ void XdsClient::ChannelState::RetryableCall::OnRetryTimerLocked( } StartNewCallLocked(); } - GRPC_ERROR_UNREF(error); } // @@ -1122,38 +1111,30 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( // void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { - if (next_report_timer_callback_pending_) { - grpc_timer_cancel(&next_report_timer_); + if (timer_handle_.has_value() && + GetDefaultEventEngine()->Cancel(*timer_handle_)) { + timer_handle_.reset(); + Unref(DEBUG_LOCATION, "Orphan"); } } void XdsClient::ChannelState::LrsCallState::Reporter:: ScheduleNextReportLocked() { - const Timestamp next_report_time = ExecCtx::Get()->Now() + report_interval_; - grpc_timer_init(&next_report_timer_, next_report_time, - &on_next_report_timer_); - next_report_timer_callback_pending_ = true; -} - -void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer( - void* arg, grpc_error_handle error) { - Reporter* self = static_cast(arg); - bool done; - { - MutexLock lock(&self->xds_client()->mu_); - done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error)); - } - if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer"); + timer_handle_ = GetDefaultEventEngine()->RunAfter(report_interval_, [this]() { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + if (OnNextReportTimer()) { + Unref(DEBUG_LOCATION, "OnNextReportTimer()"); + } + }); } -bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked( - grpc_error_handle error) { - next_report_timer_callback_pending_ = false; - if (!GRPC_ERROR_IS_NONE(error) || !IsCurrentReporterOnCall()) { - GRPC_ERROR_UNREF(error); - return true; - } - return SendReportLocked(); +bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer() { + MutexLock lock(&xds_client()->mu_); + timer_handle_.reset(); + if (!IsCurrentReporterOnCall()) return true; + SendReportLocked(); + return false; } namespace { @@ -1208,7 +1189,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked() { // method will be called even though it was for a completion started // by the old reporter. In that case, the timer will be pending, so // we just ignore the completion and wait for the timer to fire. - if (next_report_timer_callback_pending_) return; + if (timer_handle_.has_value()) return; // If there are no more registered stats to report, cancel the call. auto it = xds_client()->xds_load_report_server_map_.find(parent_->chand()->server_);