From e4af983819998ba1e07223d4742aaaf78f2efe95 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Thu, 2 Feb 2023 11:20:42 -0800 Subject: [PATCH] Reland "EventEngine::RunAfter: GrpcLb" (#32262)" (#32269) There was a ~1% flake in grpclb end2end tests that was reproducible in opt builds, manifesting as a hang, usually in a the SingleBalancerTest.Fallback test. Through experimentation, I found that by skipping the death test in the grpclb end2end test suite, the hang was no longer reproducible in 10,000 runs. Similarly, moving this test to the end of the suite, or making it run first (as is the case in this PR) resulted in 0 failures in 3000 runs. It's unclear to me yet why the death test causes things to be unstable in this way. It's clear from the logs that one test does affect the rest, grpc_init is done once for all tests, so all tests utilize the same EventEngine ... until the death test completes, and a new EventEngine is created for the next test. I think this death test is sufficiently artificial that it's fine to change the test ordering itself, and ignore the wonky intermediate state that results from it. Reproducing the flake: ``` tools/bazel --bazelrc=tools/remote_build/linux.bazelrc test \ -c opt \ --test_env=GRPC_TRACE=event_engine \ --runs_per_test=5000 \ --test_output=summary \ test/cpp/end2end/grpclb_end2end_test@poller=epoll1 ``` --- src/core/BUILD | 1 - .../client_channel/lb_policy/grpclb/grpclb.cc | 167 +++++++++--------- test/cpp/end2end/grpclb_end2end_test.cc | 30 ++-- 3 files changed, 97 insertions(+), 101 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index 5e56633af3e..49bd2ca13e3 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3702,7 +3702,6 @@ grpc_cc_library( "//:grpc_resolver_fake", "//:grpc_security_base", "//:grpc_trace", - "//:iomgr_timer", "//:orphanable", "//:protobuf_duration_upb", "//:protobuf_timestamp_upb", diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 934eea66045..2695e53056a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -65,6 +65,7 @@ #include #include #include +#include #include #include @@ -122,7 +123,6 @@ #include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/socket_utils.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" #include "src/core/lib/json/json_args.h" #include "src/core/lib/json/json_object_loader.h" @@ -495,7 +495,8 @@ class GrpcLb : public LoadBalancingPolicy { "entering fallback mode", parent_.get(), status.ToString().c_str()); parent_->fallback_at_startup_checks_pending_ = false; - grpc_timer_cancel(&parent_->lb_fallback_timer_); + parent_->channel_control_helper()->GetEventEngine()->Cancel( + *parent_->lb_fallback_timer_handle_); parent_->fallback_mode_ = true; parent_->CreateOrUpdateChildPolicyLocked(); // Cancel the watch, since we don't care about the channel state once we @@ -516,14 +517,12 @@ class GrpcLb : public LoadBalancingPolicy { // Methods for dealing with fallback state. void MaybeEnterFallbackModeAfterStartup(); - static void OnFallbackTimer(void* arg, grpc_error_handle error); - void OnFallbackTimerLocked(grpc_error_handle error); + void OnFallbackTimerLocked(); // Methods for dealing with the balancer call. void StartBalancerCallLocked(); void StartBalancerCallRetryTimerLocked(); - static void OnBalancerCallRetryTimer(void* arg, grpc_error_handle error); - void OnBalancerCallRetryTimerLocked(grpc_error_handle error); + void OnBalancerCallRetryTimerLocked(); // Methods for dealing with the child policy. ChannelArgs CreateChildPolicyArgsLocked( @@ -536,8 +535,7 @@ class GrpcLb : public LoadBalancingPolicy { void CacheDeletedSubchannelLocked( RefCountedPtr subchannel); void StartSubchannelCacheTimerLocked(); - static void OnSubchannelCacheTimer(void* arg, grpc_error_handle error); - void OnSubchannelCacheTimerLocked(grpc_error_handle error); + void OnSubchannelCacheTimerLocked(); // Who the client is trying to communicate with. std::string server_name_; @@ -568,9 +566,7 @@ class GrpcLb : public LoadBalancingPolicy { const Duration lb_call_timeout_; // Balancer call retry state. BackOff lb_call_backoff_; - bool retry_timer_callback_pending_ = false; - grpc_timer lb_call_retry_timer_; - grpc_closure lb_on_call_retry_; + absl::optional lb_call_retry_timer_handle_; // The deserialized response from the balancer. May be nullptr until one // such response has arrived. @@ -588,8 +584,7 @@ class GrpcLb : public LoadBalancingPolicy { // we have not received a serverlist from the balancer. const Duration fallback_at_startup_timeout_; bool fallback_at_startup_checks_pending_ = false; - grpc_timer lb_fallback_timer_; - grpc_closure lb_on_fallback_; + absl::optional lb_fallback_timer_handle_; // The child policy to use for the backends. OrphanablePtr child_policy_; @@ -601,9 +596,7 @@ class GrpcLb : public LoadBalancingPolicy { std::map>> cached_subchannels_; - grpc_timer subchannel_cache_timer_; - grpc_closure on_subchannel_cache_timer_; - bool subchannel_cache_timer_pending_ = false; + absl::optional subchannel_cache_timer_handle_; }; // @@ -1262,7 +1255,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() { } if (grpclb_policy()->fallback_at_startup_checks_pending_) { grpclb_policy()->fallback_at_startup_checks_pending_ = false; - grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_); + grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel( + *grpclb_policy()->lb_fallback_timer_handle_); grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked(); } // Update the serverlist in the GrpcLb instance. This serverlist @@ -1280,7 +1274,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() { grpclb_policy()); if (grpclb_policy()->fallback_at_startup_checks_pending_) { grpclb_policy()->fallback_at_startup_checks_pending_ = false; - grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_); + grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel( + *grpclb_policy()->lb_fallback_timer_handle_); grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked(); } grpclb_policy()->fallback_mode_ = true; @@ -1347,7 +1342,8 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( "serverlist; entering fallback mode", grpclb_policy()); grpclb_policy()->fallback_at_startup_checks_pending_ = false; - grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_); + grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel( + *grpclb_policy()->lb_fallback_timer_handle_); grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked(); grpclb_policy()->fallback_mode_ = true; grpclb_policy()->CreateOrUpdateChildPolicyLocked(); @@ -1499,29 +1495,25 @@ GrpcLb::GrpcLb(Args args) "[grpclb %p] Will use '%s' as the server name for LB request.", this, server_name_.c_str()); } - // Closure Initialization - GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&on_subchannel_cache_timer_, &OnSubchannelCacheTimer, this, - nullptr); } void GrpcLb::ShutdownLocked() { shutting_down_ = true; lb_calld_.reset(); - if (subchannel_cache_timer_pending_) { - subchannel_cache_timer_pending_ = false; - grpc_timer_cancel(&subchannel_cache_timer_); + if (subchannel_cache_timer_handle_.has_value()) { + channel_control_helper()->GetEventEngine()->Cancel( + *subchannel_cache_timer_handle_); + subchannel_cache_timer_handle_.reset(); } cached_subchannels_.clear(); - if (retry_timer_callback_pending_) { - grpc_timer_cancel(&lb_call_retry_timer_); + if (lb_call_retry_timer_handle_.has_value()) { + channel_control_helper()->GetEventEngine()->Cancel( + *lb_call_retry_timer_handle_); } if (fallback_at_startup_checks_pending_) { fallback_at_startup_checks_pending_ = false; - grpc_timer_cancel(&lb_fallback_timer_); + channel_control_helper()->GetEventEngine()->Cancel( + *lb_fallback_timer_handle_); CancelBalancerChannelConnectivityWatchLocked(); } if (child_policy_ != nullptr) { @@ -1582,9 +1574,18 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { if (is_initial_update) { fallback_at_startup_checks_pending_ = true; // Start timer. - Timestamp deadline = Timestamp::Now() + fallback_at_startup_timeout_; - Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback - grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); + lb_fallback_timer_handle_ = + channel_control_helper()->GetEventEngine()->RunAfter( + fallback_at_startup_timeout_, + [self = static_cast>( + Ref(DEBUG_LOCATION, "on_fallback_timer"))]() mutable { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + auto self_ptr = self.get(); + self_ptr->work_serializer()->Run( + [self = std::move(self)]() { self->OnFallbackTimerLocked(); }, + DEBUG_LOCATION); + }); // Start watching the channel's connectivity state. If the channel // goes into state TRANSIENT_FAILURE before the timer fires, we go into // fallback mode even if the fallback timeout has not elapsed. @@ -1674,10 +1675,9 @@ void GrpcLb::StartBalancerCallLocked() { } void GrpcLb::StartBalancerCallRetryTimerLocked() { - Timestamp next_try = lb_call_backoff_.NextAttemptTime(); + Duration timeout = lb_call_backoff_.NextAttemptTime() - Timestamp::Now(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this); - Duration timeout = next_try - Timestamp::Now(); if (timeout > Duration::Zero()) { gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.", this, timeout.millis()); @@ -1686,33 +1686,30 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() { this); } } - // TODO(roth): We currently track this ref manually. Once the - // ClosureRef API is ready, we should pass the RefCountedPtr<> along - // with the callback. - auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); - self.release(); - retry_timer_callback_pending_ = true; - grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); -} - -void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error_handle error) { - GrpcLb* grpclb_policy = static_cast(arg); - grpclb_policy->work_serializer()->Run( - [grpclb_policy, error]() { - grpclb_policy->OnBalancerCallRetryTimerLocked(error); - }, - DEBUG_LOCATION); + lb_call_retry_timer_handle_ = + channel_control_helper()->GetEventEngine()->RunAfter( + timeout, + [self = static_cast>( + Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"))]() mutable { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + auto self_ptr = self.get(); + self_ptr->work_serializer()->Run( + [self = std::move(self)]() { + self->OnBalancerCallRetryTimerLocked(); + }, + DEBUG_LOCATION); + }); } -void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error_handle error) { - retry_timer_callback_pending_ = false; - if (!shutting_down_ && error.ok() && lb_calld_ == nullptr) { +void GrpcLb::OnBalancerCallRetryTimerLocked() { + lb_call_retry_timer_handle_.reset(); + if (!shutting_down_ && lb_calld_ == nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", this); } StartBalancerCallLocked(); } - Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); } // @@ -1737,17 +1734,10 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() { } } -void GrpcLb::OnFallbackTimer(void* arg, grpc_error_handle error) { - GrpcLb* grpclb_policy = static_cast(arg); - grpclb_policy->work_serializer()->Run( - [grpclb_policy, error]() { grpclb_policy->OnFallbackTimerLocked(error); }, - DEBUG_LOCATION); -} - -void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) { +void GrpcLb::OnFallbackTimerLocked() { // If we receive a serverlist after the timer fires but before this callback // actually runs, don't fall back. - if (fallback_at_startup_checks_pending_ && !shutting_down_ && error.ok()) { + if (fallback_at_startup_checks_pending_ && !shutting_down_) { gpr_log(GPR_INFO, "[grpclb %p] No response from balancer after fallback timeout; " "entering fallback mode", @@ -1757,7 +1747,6 @@ void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) { fallback_mode_ = true; CreateOrUpdateChildPolicyLocked(); } - Unref(DEBUG_LOCATION, "on_fallback_timer"); } // @@ -1780,7 +1769,8 @@ OrphanablePtr GrpcLb::CreateChildPolicyLocked( LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.work_serializer = work_serializer(); lb_policy_args.args = args; - lb_policy_args.channel_control_helper = std::make_unique(Ref()); + lb_policy_args.channel_control_helper = + std::make_unique(Ref(DEBUG_LOCATION, "Helper")); OrphanablePtr lb_policy = MakeOrphanable(std::move(lb_policy_args), &grpc_lb_glb_trace); @@ -1803,9 +1793,10 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { bool is_backend_from_grpclb_load_balancer = false; if (fallback_mode_) { // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't - // received any serverlist from the balancer, we use the fallback backends - // returned by the resolver. Note that the fallback backend list may be - // empty, in which case the new child policy will fail the picks. + // received any serverlist from the balancer, we use the fallback + // backends returned by the resolver. Note that the fallback backend + // list may be empty, in which case the new child policy will fail the + // picks. update_args.addresses = fallback_backend_addresses_; if (fallback_backend_addresses_.ok() && fallback_backend_addresses_->empty()) { @@ -1844,28 +1835,32 @@ void GrpcLb::CacheDeletedSubchannelLocked( RefCountedPtr subchannel) { Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_; cached_subchannels_[deletion_time].push_back(std::move(subchannel)); - if (!subchannel_cache_timer_pending_) { - Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release(); - subchannel_cache_timer_pending_ = true; + if (!subchannel_cache_timer_handle_.has_value()) { StartSubchannelCacheTimerLocked(); } } void GrpcLb::StartSubchannelCacheTimerLocked() { GPR_ASSERT(!cached_subchannels_.empty()); - grpc_timer_init(&subchannel_cache_timer_, cached_subchannels_.begin()->first, - &on_subchannel_cache_timer_); -} - -void GrpcLb::OnSubchannelCacheTimer(void* arg, grpc_error_handle error) { - auto* self = static_cast(arg); - self->work_serializer()->Run( - [self, error]() { self->GrpcLb::OnSubchannelCacheTimerLocked(error); }, - DEBUG_LOCATION); + subchannel_cache_timer_handle_ = + channel_control_helper()->GetEventEngine()->RunAfter( + cached_subchannels_.begin()->first - Timestamp::Now(), + [self = static_cast>( + Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer"))]() mutable { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + auto* self_ptr = self.get(); + self_ptr->work_serializer()->Run( + [self = std::move(self)]() mutable { + self->OnSubchannelCacheTimerLocked(); + }, + DEBUG_LOCATION); + }); } -void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) { - if (subchannel_cache_timer_pending_ && error.ok()) { +void GrpcLb::OnSubchannelCacheTimerLocked() { + if (subchannel_cache_timer_handle_.has_value()) { + subchannel_cache_timer_handle_.reset(); auto it = cached_subchannels_.begin(); if (it != cached_subchannels_.end()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { @@ -1879,9 +1874,7 @@ void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) { StartSubchannelCacheTimerLocked(); return; } - subchannel_cache_timer_pending_ = false; } - Unref(DEBUG_LOCATION, "OnSubchannelCacheTimer"); } // diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 45ef2877f1b..9a0b86d833e 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -1016,19 +1016,6 @@ TEST_F(SingleBalancerTest, SecureNaming) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, SecureNamingDeathTest) { - GTEST_FLAG_SET(death_test_style, "threadsafe"); - // Make sure that we blow up (via abort() from the security connector) when - // the name from the balancer doesn't match expectations. - ASSERT_DEATH_IF_SUPPORTED( - { - ResetStub(0, kApplicationTargetName_ + ";lb"); - SetNextResolution({AddressData{balancers_[0]->port_, "woops"}}); - channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1)); - }, - ""); -} - TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { SetNextResolutionAllBalancers(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); @@ -1474,6 +1461,23 @@ TEST_F(SingleBalancerTest, ServiceNameFromLbPolicyConfig) { EXPECT_EQ(balancers_[0]->service_.service_names().back(), "test_service"); } +// This death test is kept separate from the rest to ensure that it's run before +// any others. See https://github.com/grpc/grpc/pull/32269 for details. +using SingleBalancerDeathTest = SingleBalancerTest; + +TEST_F(SingleBalancerDeathTest, SecureNaming) { + GTEST_FLAG_SET(death_test_style, "threadsafe"); + // Make sure that we blow up (via abort() from the security connector) when + // the name from the balancer doesn't match expectations. + ASSERT_DEATH_IF_SUPPORTED( + { + ResetStub(0, kApplicationTargetName_ + ";lb"); + SetNextResolution({AddressData{balancers_[0]->port_, "woops"}}); + channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1)); + }, + ""); +} + class UpdatesTest : public GrpclbEnd2endTest { public: UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}