Revert "EventEngine::RunAfter: GrpcLb" (#32262)

* Revert "EventEngine::RunAfter: GrpcLb (#30043)"

This reverts commit b19604ea60.

* iwyu
pull/32272/head
Mark D. Roth 2 years ago committed by GitHub
parent 439c7518a3
commit 95b2f0c8ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 167
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@ -3620,6 +3620,7 @@ grpc_cc_library(
"//:grpc_resolver_fake",
"//:grpc_security_base",
"//:grpc_trace",
"//:iomgr_timer",
"//:orphanable",
"//:promise",
"//:protobuf_duration_upb",

@ -65,7 +65,6 @@
#include <map>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
@ -123,6 +122,7 @@
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
@ -495,8 +495,7 @@ class GrpcLb : public LoadBalancingPolicy {
"entering fallback mode",
parent_.get(), status.ToString().c_str());
parent_->fallback_at_startup_checks_pending_ = false;
parent_->channel_control_helper()->GetEventEngine()->Cancel(
*parent_->lb_fallback_timer_handle_);
grpc_timer_cancel(&parent_->lb_fallback_timer_);
parent_->fallback_mode_ = true;
parent_->CreateOrUpdateChildPolicyLocked();
// Cancel the watch, since we don't care about the channel state once we
@ -517,12 +516,14 @@ class GrpcLb : public LoadBalancingPolicy {
// Methods for dealing with fallback state.
void MaybeEnterFallbackModeAfterStartup();
void OnFallbackTimerLocked();
static void OnFallbackTimer(void* arg, grpc_error_handle error);
void OnFallbackTimerLocked(grpc_error_handle error);
// Methods for dealing with the balancer call.
void StartBalancerCallLocked();
void StartBalancerCallRetryTimerLocked();
void OnBalancerCallRetryTimerLocked();
static void OnBalancerCallRetryTimer(void* arg, grpc_error_handle error);
void OnBalancerCallRetryTimerLocked(grpc_error_handle error);
// Methods for dealing with the child policy.
ChannelArgs CreateChildPolicyArgsLocked(
@ -535,7 +536,8 @@ class GrpcLb : public LoadBalancingPolicy {
void CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel);
void StartSubchannelCacheTimerLocked();
void OnSubchannelCacheTimerLocked();
static void OnSubchannelCacheTimer(void* arg, grpc_error_handle error);
void OnSubchannelCacheTimerLocked(grpc_error_handle error);
// Who the client is trying to communicate with.
std::string server_name_;
@ -566,7 +568,9 @@ class GrpcLb : public LoadBalancingPolicy {
const Duration lb_call_timeout_;
// Balancer call retry state.
BackOff lb_call_backoff_;
absl::optional<EventEngine::TaskHandle> lb_call_retry_timer_handle_;
bool retry_timer_callback_pending_ = false;
grpc_timer lb_call_retry_timer_;
grpc_closure lb_on_call_retry_;
// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
@ -584,7 +588,8 @@ class GrpcLb : public LoadBalancingPolicy {
// we have not received a serverlist from the balancer.
const Duration fallback_at_startup_timeout_;
bool fallback_at_startup_checks_pending_ = false;
absl::optional<EventEngine::TaskHandle> lb_fallback_timer_handle_;
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
// The child policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
@ -596,7 +601,9 @@ class GrpcLb : public LoadBalancingPolicy {
std::map<Timestamp /*deletion time*/,
std::vector<RefCountedPtr<SubchannelInterface>>>
cached_subchannels_;
absl::optional<EventEngine::TaskHandle> subchannel_cache_timer_handle_;
grpc_timer subchannel_cache_timer_;
grpc_closure on_subchannel_cache_timer_;
bool subchannel_cache_timer_pending_ = false;
};
//
@ -1255,8 +1262,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
}
if (grpclb_policy()->fallback_at_startup_checks_pending_) {
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
*grpclb_policy()->lb_fallback_timer_handle_);
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
}
// Update the serverlist in the GrpcLb instance. This serverlist
@ -1274,8 +1280,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
grpclb_policy());
if (grpclb_policy()->fallback_at_startup_checks_pending_) {
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
*grpclb_policy()->lb_fallback_timer_handle_);
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
}
grpclb_policy()->fallback_mode_ = true;
@ -1342,8 +1347,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
"serverlist; entering fallback mode",
grpclb_policy());
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
*grpclb_policy()->lb_fallback_timer_handle_);
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy()->fallback_mode_ = true;
grpclb_policy()->CreateOrUpdateChildPolicyLocked();
@ -1495,25 +1499,29 @@ GrpcLb::GrpcLb(Args args)
"[grpclb %p] Will use '%s' as the server name for LB request.",
this, server_name_.c_str());
}
// Closure Initialization
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_subchannel_cache_timer_, &OnSubchannelCacheTimer, this,
nullptr);
}
void GrpcLb::ShutdownLocked() {
shutting_down_ = true;
lb_calld_.reset();
if (subchannel_cache_timer_handle_.has_value()) {
channel_control_helper()->GetEventEngine()->Cancel(
*subchannel_cache_timer_handle_);
subchannel_cache_timer_handle_.reset();
if (subchannel_cache_timer_pending_) {
subchannel_cache_timer_pending_ = false;
grpc_timer_cancel(&subchannel_cache_timer_);
}
cached_subchannels_.clear();
if (lb_call_retry_timer_handle_.has_value()) {
channel_control_helper()->GetEventEngine()->Cancel(
*lb_call_retry_timer_handle_);
if (retry_timer_callback_pending_) {
grpc_timer_cancel(&lb_call_retry_timer_);
}
if (fallback_at_startup_checks_pending_) {
fallback_at_startup_checks_pending_ = false;
channel_control_helper()->GetEventEngine()->Cancel(
*lb_fallback_timer_handle_);
grpc_timer_cancel(&lb_fallback_timer_);
CancelBalancerChannelConnectivityWatchLocked();
}
if (child_policy_ != nullptr) {
@ -1574,18 +1582,9 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
if (is_initial_update) {
fallback_at_startup_checks_pending_ = true;
// Start timer.
lb_fallback_timer_handle_ =
channel_control_helper()->GetEventEngine()->RunAfter(
fallback_at_startup_timeout_,
[self = static_cast<RefCountedPtr<GrpcLb>>(
Ref(DEBUG_LOCATION, "on_fallback_timer"))]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->work_serializer()->Run(
[self = std::move(self)]() { self->OnFallbackTimerLocked(); },
DEBUG_LOCATION);
});
Timestamp deadline = Timestamp::Now() + fallback_at_startup_timeout_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE before the timer fires, we go into
// fallback mode even if the fallback timeout has not elapsed.
@ -1675,9 +1674,10 @@ void GrpcLb::StartBalancerCallLocked() {
}
void GrpcLb::StartBalancerCallRetryTimerLocked() {
Duration timeout = lb_call_backoff_.NextAttemptTime() - Timestamp::Now();
Timestamp next_try = lb_call_backoff_.NextAttemptTime();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
Duration timeout = next_try - Timestamp::Now();
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
this, timeout.millis());
@ -1686,30 +1686,33 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
this);
}
}
lb_call_retry_timer_handle_ =
channel_control_helper()->GetEventEngine()->RunAfter(
timeout,
[self = static_cast<RefCountedPtr<GrpcLb>>(
Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"))]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->work_serializer()->Run(
[self = std::move(self)]() {
self->OnBalancerCallRetryTimerLocked();
},
DEBUG_LOCATION);
});
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
self.release();
retry_timer_callback_pending_ = true;
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
}
void GrpcLb::OnBalancerCallRetryTimerLocked() {
lb_call_retry_timer_handle_.reset();
if (!shutting_down_ && lb_calld_ == nullptr) {
void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error_handle error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->work_serializer()->Run(
[grpclb_policy, error]() {
grpclb_policy->OnBalancerCallRetryTimerLocked(error);
},
DEBUG_LOCATION);
}
void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error_handle error) {
retry_timer_callback_pending_ = false;
if (!shutting_down_ && error.ok() && lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", this);
}
StartBalancerCallLocked();
}
Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
}
//
@ -1734,10 +1737,17 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
}
}
void GrpcLb::OnFallbackTimerLocked() {
void GrpcLb::OnFallbackTimer(void* arg, grpc_error_handle error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->work_serializer()->Run(
[grpclb_policy, error]() { grpclb_policy->OnFallbackTimerLocked(error); },
DEBUG_LOCATION);
}
void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) {
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (fallback_at_startup_checks_pending_ && !shutting_down_) {
if (fallback_at_startup_checks_pending_ && !shutting_down_ && error.ok()) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
@ -1747,6 +1757,7 @@ void GrpcLb::OnFallbackTimerLocked() {
fallback_mode_ = true;
CreateOrUpdateChildPolicyLocked();
}
Unref(DEBUG_LOCATION, "on_fallback_timer");
}
//
@ -1769,8 +1780,7 @@ OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
lb_policy_args.channel_control_helper = std::make_unique<Helper>(Ref());
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_glb_trace);
@ -1793,10 +1803,9 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
bool is_backend_from_grpclb_load_balancer = false;
if (fallback_mode_) {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback
// backends returned by the resolver. Note that the fallback backend
// list may be empty, in which case the new child policy will fail the
// picks.
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new child policy will fail the picks.
update_args.addresses = fallback_backend_addresses_;
if (fallback_backend_addresses_.ok() &&
fallback_backend_addresses_->empty()) {
@ -1835,32 +1844,28 @@ void GrpcLb::CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel) {
Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_;
cached_subchannels_[deletion_time].push_back(std::move(subchannel));
if (!subchannel_cache_timer_handle_.has_value()) {
if (!subchannel_cache_timer_pending_) {
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release();
subchannel_cache_timer_pending_ = true;
StartSubchannelCacheTimerLocked();
}
}
void GrpcLb::StartSubchannelCacheTimerLocked() {
GPR_ASSERT(!cached_subchannels_.empty());
subchannel_cache_timer_handle_ =
channel_control_helper()->GetEventEngine()->RunAfter(
cached_subchannels_.begin()->first - Timestamp::Now(),
[self = static_cast<RefCountedPtr<GrpcLb>>(
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer"))]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto* self_ptr = self.get();
self_ptr->work_serializer()->Run(
[self = std::move(self)]() mutable {
self->OnSubchannelCacheTimerLocked();
},
DEBUG_LOCATION);
});
grpc_timer_init(&subchannel_cache_timer_, cached_subchannels_.begin()->first,
&on_subchannel_cache_timer_);
}
void GrpcLb::OnSubchannelCacheTimer(void* arg, grpc_error_handle error) {
auto* self = static_cast<GrpcLb*>(arg);
self->work_serializer()->Run(
[self, error]() { self->GrpcLb::OnSubchannelCacheTimerLocked(error); },
DEBUG_LOCATION);
}
void GrpcLb::OnSubchannelCacheTimerLocked() {
if (subchannel_cache_timer_handle_.has_value()) {
subchannel_cache_timer_handle_.reset();
void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) {
if (subchannel_cache_timer_pending_ && error.ok()) {
auto it = cached_subchannels_.begin();
if (it != cached_subchannels_.end()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
@ -1874,7 +1879,9 @@ void GrpcLb::OnSubchannelCacheTimerLocked() {
StartSubchannelCacheTimerLocked();
return;
}
subchannel_cache_timer_pending_ = false;
}
Unref(DEBUG_LOCATION, "OnSubchannelCacheTimer");
}
//

Loading…
Cancel
Save