XdsClient: convert timers to use EE API (#30189)

* XdsClient: convert timers to use EE API

* fix build

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* code review comments

* fix build

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/30220/head
Mark D. Roth 2 years ago committed by GitHub
parent d43c904495
commit 936f4a21f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 221
      src/core/ext/xds/xds_client.cc

@ -3817,6 +3817,7 @@ grpc_cc_library(
"channel_fwd",
"config",
"debug_location",
"default_event_engine_factory_hdrs",
"envoy_admin_upb",
"envoy_config_cluster_upb",
"envoy_config_cluster_upbdefs",

@ -31,7 +31,9 @@
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
@ -40,13 +42,12 @@
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/uri/uri_parser.h"
#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
@ -57,6 +58,9 @@
namespace grpc_core {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
TraceFlag grpc_xds_client_trace(false, "xds_client");
TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
@ -72,9 +76,12 @@ class XdsClient::ChannelState::RetryableCall
public:
explicit RetryableCall(WeakRefCountedPtr<ChannelState> chand);
void Orphan() override;
// Disable thread-safety analysis because this method is called via
// OrphanablePtr<>, but there's no way to pass the lock annotation
// through there.
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
void OnCallFinishedLocked();
void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
T* calld() const { return calld_.get(); }
ChannelState* chand() const { return chand_.get(); }
@ -83,9 +90,9 @@ class XdsClient::ChannelState::RetryableCall
private:
void StartNewCallLocked();
void StartRetryTimerLocked();
static void OnRetryTimer(void* arg, grpc_error_handle error);
void OnRetryTimerLocked(grpc_error_handle error);
void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void OnRetryTimer();
// The wrapped xds call that talks to the xds server. It's instantiated
// every time we start a new call. It's null during call retry backoff.
@ -95,9 +102,8 @@ class XdsClient::ChannelState::RetryableCall
// Retry state.
BackOff backoff_;
grpc_timer retry_timer_;
grpc_closure on_retry_timer_;
bool retry_timer_callback_pending_ = false;
absl::optional<EventEngine::TaskHandle> timer_handle_
ABSL_GUARDED_BY(&XdsClient::mu_);
bool shutting_down_ = false;
};
@ -163,12 +169,12 @@ class XdsClient::ChannelState::AdsCallState
class ResourceTimer : public InternallyRefCounted<ResourceTimer> {
public:
ResourceTimer(const XdsResourceType* type, const XdsResourceName& name)
: type_(type), name_(name) {
GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
grpc_schedule_on_exec_ctx);
}
: type_(type), name_(name) {}
void Orphan() override {
// Disable thread-safety analysis because this method is called via
// OrphanablePtr<>, but there's no way to pass the lock annotation
// through there.
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS {
MaybeCancelTimer();
Unref(DEBUG_LOCATION, "Orphan");
}
@ -188,15 +194,16 @@ class XdsClient::ChannelState::AdsCallState
if (state.resource != nullptr) return;
// Start timer.
ads_calld_ = std::move(ads_calld);
Ref(DEBUG_LOCATION, "timer").release();
timer_pending_ = true;
grpc_timer_init(
&timer_,
ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
&timer_callback_);
timer_handle_ = GetDefaultEventEngine()->RunAfter(
ads_calld_->xds_client()->request_timeout_,
[self = Ref(DEBUG_LOCATION, "timer")]() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnTimer();
});
}
void MaybeCancelTimer() {
void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
// If the timer hasn't been started yet, make sure we don't start
// it later. This can happen if the last watch for an LDS or CDS
// resource is cancelled and then restarted, both while an ADS
@ -209,57 +216,48 @@ class XdsClient::ChannelState::AdsCallState
// For details, see https://github.com/grpc/grpc/issues/29583.
// TODO(roth): Find a way to write a test for this case.
timer_start_needed_ = false;
if (timer_pending_) {
grpc_timer_cancel(&timer_);
timer_pending_ = false;
if (timer_handle_.has_value()) {
GetDefaultEventEngine()->Cancel(*timer_handle_);
timer_handle_.reset();
}
}
private:
static void OnTimer(void* arg, grpc_error_handle error) {
ResourceTimer* self = static_cast<ResourceTimer*>(arg);
void OnTimer() {
{
MutexLock lock(&self->ads_calld_->xds_client()->mu_);
self->OnTimerLocked(GRPC_ERROR_REF(error));
}
self->ads_calld_->xds_client()->work_serializer_.DrainQueue();
self->ads_calld_.reset();
self->Unref(DEBUG_LOCATION, "timer");
}
void OnTimerLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
if (GRPC_ERROR_IS_NONE(error) && timer_pending_) {
timer_pending_ = false;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: timeout obtaining resource "
"{type=%s name=%s} from xds server",
ads_calld_->xds_client(),
ads_calld_->chand()->server_.server_uri.c_str(),
std::string(type_->type_url()).c_str(),
XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.key)
.c_str());
MutexLock lock(&ads_calld_->xds_client()->mu_);
if (timer_handle_.has_value()) {
timer_handle_.reset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: timeout obtaining resource "
"{type=%s name=%s} from xds server",
ads_calld_->xds_client(),
ads_calld_->chand()->server_.server_uri.c_str(),
std::string(type_->type_url()).c_str(),
XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.key)
.c_str());
}
auto& authority_state =
ads_calld_->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
state.watchers);
}
auto& authority_state =
ads_calld_->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
state.watchers);
}
GRPC_ERROR_UNREF(error);
ads_calld_->xds_client()->work_serializer_.DrainQueue();
ads_calld_.reset();
}
const XdsResourceType* type_;
const XdsResourceName name_;
RefCountedPtr<AdsCallState> ads_calld_;
bool timer_start_needed_ = true;
bool timer_pending_ = false;
grpc_timer timer_;
grpc_closure timer_callback_;
bool timer_start_needed_ ABSL_GUARDED_BY(&XdsClient::mu_) = true;
absl::optional<EventEngine::TaskHandle> timer_handle_
ABSL_GUARDED_BY(&XdsClient::mu_);
};
class StreamEventHandler
@ -364,21 +362,20 @@ class XdsClient::ChannelState::LrsCallState
public:
Reporter(RefCountedPtr<LrsCallState> parent, Duration report_interval)
: parent_(std::move(parent)), report_interval_(report_interval) {
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
grpc_schedule_on_exec_ctx);
ScheduleNextReportLocked();
}
void Orphan() override;
// Disable thread-safety analysis because this method is called via
// OrphanablePtr<>, but there's no way to pass the lock annotation
// through there.
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
void OnReportDoneLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
private:
void ScheduleNextReportLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
static void OnNextReportTimer(void* arg, grpc_error_handle error);
bool OnNextReportTimerLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
bool OnNextReportTimer();
bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
bool IsCurrentReporterOnCall() const {
@ -392,9 +389,8 @@ class XdsClient::ChannelState::LrsCallState
// The load reporting state.
const Duration report_interval_;
bool last_report_counters_were_zero_ = false;
bool next_report_timer_callback_pending_ = false;
grpc_timer next_report_timer_;
grpc_closure on_next_report_timer_;
absl::optional<EventEngine::TaskHandle> timer_handle_
ABSL_GUARDED_BY(&XdsClient::mu_);
};
void OnRequestSent(bool ok);
@ -566,9 +562,6 @@ XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
.set_jitter(GRPC_XDS_RECONNECT_JITTER)
.set_max_backoff(Duration::Seconds(
GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
// Closure Initialization
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
grpc_schedule_on_exec_ctx);
StartNewCallLocked();
}
@ -576,7 +569,10 @@ template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
shutting_down_ = true;
calld_.reset();
if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
if (timer_handle_.has_value()) {
GetDefaultEventEngine()->Cancel(*timer_handle_);
timer_handle_.reset();
}
this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
}
@ -608,36 +604,30 @@ template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
if (shutting_down_) return;
const Timestamp next_attempt_time = backoff_.NextAttemptTime();
const Duration timeout =
std::max(next_attempt_time - ExecCtx::Get()->Now(), Duration::Zero());
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
Duration timeout =
std::max(next_attempt_time - ExecCtx::Get()->Now(), Duration::Zero());
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: call attempt failed; "
"retry timer will fire in %" PRId64 "ms.",
chand()->xds_client(), chand()->server_.server_uri.c_str(),
timeout.millis());
}
this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
retry_timer_callback_pending_ = true;
timer_handle_ = GetDefaultEventEngine()->RunAfter(
timeout,
[self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnRetryTimer();
});
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
void* arg, grpc_error_handle error) {
RetryableCall* calld = static_cast<RetryableCall*>(arg);
{
MutexLock lock(&calld->chand_->xds_client()->mu_);
calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
}
calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
grpc_error_handle error) {
retry_timer_callback_pending_ = false;
if (!shutting_down_ && GRPC_ERROR_IS_NONE(error)) {
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer() {
MutexLock lock(&chand_->xds_client()->mu_);
if (timer_handle_.has_value()) {
timer_handle_.reset();
if (shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: retry timer fired (retryable "
@ -646,7 +636,6 @@ void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
}
StartNewCallLocked();
}
GRPC_ERROR_UNREF(error);
}
//
@ -1122,38 +1111,30 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
//
void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
if (next_report_timer_callback_pending_) {
grpc_timer_cancel(&next_report_timer_);
if (timer_handle_.has_value() &&
GetDefaultEventEngine()->Cancel(*timer_handle_)) {
timer_handle_.reset();
Unref(DEBUG_LOCATION, "Orphan");
}
}
void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked() {
const Timestamp next_report_time = ExecCtx::Get()->Now() + report_interval_;
grpc_timer_init(&next_report_timer_, next_report_time,
&on_next_report_timer_);
next_report_timer_callback_pending_ = true;
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
void* arg, grpc_error_handle error) {
Reporter* self = static_cast<Reporter*>(arg);
bool done;
{
MutexLock lock(&self->xds_client()->mu_);
done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
}
if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
timer_handle_ = GetDefaultEventEngine()->RunAfter(report_interval_, [this]() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
if (OnNextReportTimer()) {
Unref(DEBUG_LOCATION, "OnNextReportTimer()");
}
});
}
bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
grpc_error_handle error) {
next_report_timer_callback_pending_ = false;
if (!GRPC_ERROR_IS_NONE(error) || !IsCurrentReporterOnCall()) {
GRPC_ERROR_UNREF(error);
return true;
}
return SendReportLocked();
bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer() {
MutexLock lock(&xds_client()->mu_);
timer_handle_.reset();
if (!IsCurrentReporterOnCall()) return true;
SendReportLocked();
return false;
}
namespace {
@ -1208,7 +1189,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked() {
// method will be called even though it was for a completion started
// by the old reporter. In that case, the timer will be pending, so
// we just ignore the completion and wait for the timer to fire.
if (next_report_timer_callback_pending_) return;
if (timer_handle_.has_value()) return;
// If there are no more registered stats to report, cancel the call.
auto it =
xds_client()->xds_load_report_server_map_.find(parent_->chand()->server_);

Loading…
Cancel
Save