Reland "EventEngine::RunAfter: GrpcLb" (#32262)" (#32269)

There was a ~1% flake in grpclb end2end tests that was reproducible in opt builds, manifesting as a hang, usually in a the SingleBalancerTest.Fallback test. Through experimentation, I found that by skipping the death test in the grpclb end2end test suite, the hang was no longer reproducible in 10,000 runs. Similarly, moving this test to the end of the suite, or making it run first (as is the case in this PR) resulted in 0 failures in 3000 runs.

It's unclear to me yet why the death test causes things to be unstable in this way. It's clear from the logs that one test does affect the rest, grpc_init is done once for all tests, so all tests utilize the same EventEngine ... until the death test completes, and a new EventEngine is created for the next test.

I think this death test is sufficiently artificial that it's fine to change the test ordering itself, and ignore the wonky intermediate state that results from it.

Reproducing the flake:

```
tools/bazel --bazelrc=tools/remote_build/linux.bazelrc test \
  -c opt \
  --test_env=GRPC_TRACE=event_engine \
  --runs_per_test=5000 \
  --test_output=summary \
  test/cpp/end2end/grpclb_end2end_test@poller=epoll1 
```
pull/32276/head
AJ Heller 2 years ago committed by GitHub
parent 8ed8a3a054
commit e4af983819
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 167
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  3. 30
      test/cpp/end2end/grpclb_end2end_test.cc

@ -3702,7 +3702,6 @@ grpc_cc_library(
"//:grpc_resolver_fake",
"//:grpc_security_base",
"//:grpc_trace",
"//:iomgr_timer",
"//:orphanable",
"//:protobuf_duration_upb",
"//:protobuf_timestamp_upb",

@ -65,6 +65,7 @@
#include <map>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
@ -122,7 +123,6 @@
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
@ -495,7 +495,8 @@ class GrpcLb : public LoadBalancingPolicy {
"entering fallback mode",
parent_.get(), status.ToString().c_str());
parent_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&parent_->lb_fallback_timer_);
parent_->channel_control_helper()->GetEventEngine()->Cancel(
*parent_->lb_fallback_timer_handle_);
parent_->fallback_mode_ = true;
parent_->CreateOrUpdateChildPolicyLocked();
// Cancel the watch, since we don't care about the channel state once we
@ -516,14 +517,12 @@ class GrpcLb : public LoadBalancingPolicy {
// Methods for dealing with fallback state.
void MaybeEnterFallbackModeAfterStartup();
static void OnFallbackTimer(void* arg, grpc_error_handle error);
void OnFallbackTimerLocked(grpc_error_handle error);
void OnFallbackTimerLocked();
// Methods for dealing with the balancer call.
void StartBalancerCallLocked();
void StartBalancerCallRetryTimerLocked();
static void OnBalancerCallRetryTimer(void* arg, grpc_error_handle error);
void OnBalancerCallRetryTimerLocked(grpc_error_handle error);
void OnBalancerCallRetryTimerLocked();
// Methods for dealing with the child policy.
ChannelArgs CreateChildPolicyArgsLocked(
@ -536,8 +535,7 @@ class GrpcLb : public LoadBalancingPolicy {
void CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel);
void StartSubchannelCacheTimerLocked();
static void OnSubchannelCacheTimer(void* arg, grpc_error_handle error);
void OnSubchannelCacheTimerLocked(grpc_error_handle error);
void OnSubchannelCacheTimerLocked();
// Who the client is trying to communicate with.
std::string server_name_;
@ -568,9 +566,7 @@ class GrpcLb : public LoadBalancingPolicy {
const Duration lb_call_timeout_;
// Balancer call retry state.
BackOff lb_call_backoff_;
bool retry_timer_callback_pending_ = false;
grpc_timer lb_call_retry_timer_;
grpc_closure lb_on_call_retry_;
absl::optional<EventEngine::TaskHandle> lb_call_retry_timer_handle_;
// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
@ -588,8 +584,7 @@ class GrpcLb : public LoadBalancingPolicy {
// we have not received a serverlist from the balancer.
const Duration fallback_at_startup_timeout_;
bool fallback_at_startup_checks_pending_ = false;
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
absl::optional<EventEngine::TaskHandle> lb_fallback_timer_handle_;
// The child policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
@ -601,9 +596,7 @@ class GrpcLb : public LoadBalancingPolicy {
std::map<Timestamp /*deletion time*/,
std::vector<RefCountedPtr<SubchannelInterface>>>
cached_subchannels_;
grpc_timer subchannel_cache_timer_;
grpc_closure on_subchannel_cache_timer_;
bool subchannel_cache_timer_pending_ = false;
absl::optional<EventEngine::TaskHandle> subchannel_cache_timer_handle_;
};
//
@ -1262,7 +1255,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
}
if (grpclb_policy()->fallback_at_startup_checks_pending_) {
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
*grpclb_policy()->lb_fallback_timer_handle_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
}
// Update the serverlist in the GrpcLb instance. This serverlist
@ -1280,7 +1274,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
grpclb_policy());
if (grpclb_policy()->fallback_at_startup_checks_pending_) {
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
*grpclb_policy()->lb_fallback_timer_handle_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
}
grpclb_policy()->fallback_mode_ = true;
@ -1347,7 +1342,8 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
"serverlist; entering fallback mode",
grpclb_policy());
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
*grpclb_policy()->lb_fallback_timer_handle_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy()->fallback_mode_ = true;
grpclb_policy()->CreateOrUpdateChildPolicyLocked();
@ -1499,29 +1495,25 @@ GrpcLb::GrpcLb(Args args)
"[grpclb %p] Will use '%s' as the server name for LB request.",
this, server_name_.c_str());
}
// Closure Initialization
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_subchannel_cache_timer_, &OnSubchannelCacheTimer, this,
nullptr);
}
void GrpcLb::ShutdownLocked() {
shutting_down_ = true;
lb_calld_.reset();
if (subchannel_cache_timer_pending_) {
subchannel_cache_timer_pending_ = false;
grpc_timer_cancel(&subchannel_cache_timer_);
if (subchannel_cache_timer_handle_.has_value()) {
channel_control_helper()->GetEventEngine()->Cancel(
*subchannel_cache_timer_handle_);
subchannel_cache_timer_handle_.reset();
}
cached_subchannels_.clear();
if (retry_timer_callback_pending_) {
grpc_timer_cancel(&lb_call_retry_timer_);
if (lb_call_retry_timer_handle_.has_value()) {
channel_control_helper()->GetEventEngine()->Cancel(
*lb_call_retry_timer_handle_);
}
if (fallback_at_startup_checks_pending_) {
fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&lb_fallback_timer_);
channel_control_helper()->GetEventEngine()->Cancel(
*lb_fallback_timer_handle_);
CancelBalancerChannelConnectivityWatchLocked();
}
if (child_policy_ != nullptr) {
@ -1582,9 +1574,18 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
if (is_initial_update) {
fallback_at_startup_checks_pending_ = true;
// Start timer.
Timestamp deadline = Timestamp::Now() + fallback_at_startup_timeout_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
lb_fallback_timer_handle_ =
channel_control_helper()->GetEventEngine()->RunAfter(
fallback_at_startup_timeout_,
[self = static_cast<RefCountedPtr<GrpcLb>>(
Ref(DEBUG_LOCATION, "on_fallback_timer"))]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->work_serializer()->Run(
[self = std::move(self)]() { self->OnFallbackTimerLocked(); },
DEBUG_LOCATION);
});
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE before the timer fires, we go into
// fallback mode even if the fallback timeout has not elapsed.
@ -1674,10 +1675,9 @@ void GrpcLb::StartBalancerCallLocked() {
}
void GrpcLb::StartBalancerCallRetryTimerLocked() {
Timestamp next_try = lb_call_backoff_.NextAttemptTime();
Duration timeout = lb_call_backoff_.NextAttemptTime() - Timestamp::Now();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
Duration timeout = next_try - Timestamp::Now();
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
this, timeout.millis());
@ -1686,33 +1686,30 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
this);
}
}
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
self.release();
retry_timer_callback_pending_ = true;
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
}
void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error_handle error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->work_serializer()->Run(
[grpclb_policy, error]() {
grpclb_policy->OnBalancerCallRetryTimerLocked(error);
},
DEBUG_LOCATION);
lb_call_retry_timer_handle_ =
channel_control_helper()->GetEventEngine()->RunAfter(
timeout,
[self = static_cast<RefCountedPtr<GrpcLb>>(
Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"))]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->work_serializer()->Run(
[self = std::move(self)]() {
self->OnBalancerCallRetryTimerLocked();
},
DEBUG_LOCATION);
});
}
void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error_handle error) {
retry_timer_callback_pending_ = false;
if (!shutting_down_ && error.ok() && lb_calld_ == nullptr) {
void GrpcLb::OnBalancerCallRetryTimerLocked() {
lb_call_retry_timer_handle_.reset();
if (!shutting_down_ && lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", this);
}
StartBalancerCallLocked();
}
Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
}
//
@ -1737,17 +1734,10 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
}
}
void GrpcLb::OnFallbackTimer(void* arg, grpc_error_handle error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->work_serializer()->Run(
[grpclb_policy, error]() { grpclb_policy->OnFallbackTimerLocked(error); },
DEBUG_LOCATION);
}
void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) {
void GrpcLb::OnFallbackTimerLocked() {
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (fallback_at_startup_checks_pending_ && !shutting_down_ && error.ok()) {
if (fallback_at_startup_checks_pending_ && !shutting_down_) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
@ -1757,7 +1747,6 @@ void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) {
fallback_mode_ = true;
CreateOrUpdateChildPolicyLocked();
}
Unref(DEBUG_LOCATION, "on_fallback_timer");
}
//
@ -1780,7 +1769,8 @@ OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper = std::make_unique<Helper>(Ref());
lb_policy_args.channel_control_helper =
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_glb_trace);
@ -1803,9 +1793,10 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
bool is_backend_from_grpclb_load_balancer = false;
if (fallback_mode_) {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new child policy will fail the picks.
// received any serverlist from the balancer, we use the fallback
// backends returned by the resolver. Note that the fallback backend
// list may be empty, in which case the new child policy will fail the
// picks.
update_args.addresses = fallback_backend_addresses_;
if (fallback_backend_addresses_.ok() &&
fallback_backend_addresses_->empty()) {
@ -1844,28 +1835,32 @@ void GrpcLb::CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel) {
Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_;
cached_subchannels_[deletion_time].push_back(std::move(subchannel));
if (!subchannel_cache_timer_pending_) {
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release();
subchannel_cache_timer_pending_ = true;
if (!subchannel_cache_timer_handle_.has_value()) {
StartSubchannelCacheTimerLocked();
}
}
void GrpcLb::StartSubchannelCacheTimerLocked() {
GPR_ASSERT(!cached_subchannels_.empty());
grpc_timer_init(&subchannel_cache_timer_, cached_subchannels_.begin()->first,
&on_subchannel_cache_timer_);
}
void GrpcLb::OnSubchannelCacheTimer(void* arg, grpc_error_handle error) {
auto* self = static_cast<GrpcLb*>(arg);
self->work_serializer()->Run(
[self, error]() { self->GrpcLb::OnSubchannelCacheTimerLocked(error); },
DEBUG_LOCATION);
subchannel_cache_timer_handle_ =
channel_control_helper()->GetEventEngine()->RunAfter(
cached_subchannels_.begin()->first - Timestamp::Now(),
[self = static_cast<RefCountedPtr<GrpcLb>>(
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer"))]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto* self_ptr = self.get();
self_ptr->work_serializer()->Run(
[self = std::move(self)]() mutable {
self->OnSubchannelCacheTimerLocked();
},
DEBUG_LOCATION);
});
}
void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) {
if (subchannel_cache_timer_pending_ && error.ok()) {
void GrpcLb::OnSubchannelCacheTimerLocked() {
if (subchannel_cache_timer_handle_.has_value()) {
subchannel_cache_timer_handle_.reset();
auto it = cached_subchannels_.begin();
if (it != cached_subchannels_.end()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
@ -1879,9 +1874,7 @@ void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) {
StartSubchannelCacheTimerLocked();
return;
}
subchannel_cache_timer_pending_ = false;
}
Unref(DEBUG_LOCATION, "OnSubchannelCacheTimer");
}
//

@ -1016,19 +1016,6 @@ TEST_F(SingleBalancerTest, SecureNaming) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(SingleBalancerTest, SecureNamingDeathTest) {
GTEST_FLAG_SET(death_test_style, "threadsafe");
// Make sure that we blow up (via abort() from the security connector) when
// the name from the balancer doesn't match expectations.
ASSERT_DEATH_IF_SUPPORTED(
{
ResetStub(0, kApplicationTargetName_ + ";lb");
SetNextResolution({AddressData{balancers_[0]->port_, "woops"}});
channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1));
},
"");
}
TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
SetNextResolutionAllBalancers();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
@ -1474,6 +1461,23 @@ TEST_F(SingleBalancerTest, ServiceNameFromLbPolicyConfig) {
EXPECT_EQ(balancers_[0]->service_.service_names().back(), "test_service");
}
// This death test is kept separate from the rest to ensure that it's run before
// any others. See https://github.com/grpc/grpc/pull/32269 for details.
using SingleBalancerDeathTest = SingleBalancerTest;
TEST_F(SingleBalancerDeathTest, SecureNaming) {
GTEST_FLAG_SET(death_test_style, "threadsafe");
// Make sure that we blow up (via abort() from the security connector) when
// the name from the balancer doesn't match expectations.
ASSERT_DEATH_IF_SUPPORTED(
{
ResetStub(0, kApplicationTargetName_ + ";lb");
SetNextResolution({AddressData{balancers_[0]->port_, "woops"}});
channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1));
},
"");
}
class UpdatesTest : public GrpclbEnd2endTest {
public:
UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}

Loading…
Cancel
Save