pull/38144/head
Mark D. Roth 5 days ago
parent 8bdef04d50
commit 4ec9f9ea1d
  1. 15
      src/core/load_balancing/pick_first/pick_first.cc
  2. 40
      test/core/load_balancing/lb_policy_test_lib.h
  3. 68
      test/core/load_balancing/pick_first_test.cc

@ -423,7 +423,7 @@ PickFirst::PickFirst(Args args)
PickFirst::~PickFirst() {
GRPC_TRACE_LOG(pick_first, INFO) << "Destroying Pick First " << this;
CHECK(subchannel_list_ == nullptr);
CHECK_EQ(subchannel_list_.get(), nullptr);
}
void PickFirst::ShutdownLocked() {
@ -744,6 +744,8 @@ void PickFirst::SubchannelList::SubchannelData::SubchannelState::
// If we're still part of a subchannel list trying to connect, check
// if we're connected.
if (subchannel_data_ != nullptr) {
CHECK_EQ(pick_first_->subchannel_list_.get(),
subchannel_data_->subchannel_list_);
// If the subchannel is READY, use it.
// Otherwise, tell the subchannel list to keep trying.
if (new_state == GRPC_CHANNEL_READY) {
@ -754,7 +756,7 @@ void PickFirst::SubchannelList::SubchannelData::SubchannelState::
return;
}
// We aren't trying to connect, so we must be the selected subchannel.
CHECK(pick_first_->selected_.get() == this);
CHECK_EQ(pick_first_->selected_.get(), this);
GRPC_TRACE_LOG(pick_first, INFO)
<< "Pick First " << pick_first_.get()
<< " selected subchannel connectivity changed to "
@ -803,15 +805,14 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
<< ", p->subchannel_list_=" << p->subchannel_list_.get()
<< ", p->subchannel_list_->shutting_down_="
<< p->subchannel_list_->shutting_down_;
if (subchannel_list_->shutting_down_) return;
// The notification must be for a subchannel in the current list.
CHECK(subchannel_list_ == p->subchannel_list_.get());
CHECK_EQ(subchannel_list_, p->subchannel_list_.get());
// SHUTDOWN should never happen.
CHECK(new_state != GRPC_CHANNEL_SHUTDOWN);
CHECK_NE(new_state, GRPC_CHANNEL_SHUTDOWN);
// READY should be caught by SubchannelState, in which case it will
// not call us in the first place.
CHECK(new_state != GRPC_CHANNEL_READY);
CHECK_NE(new_state, GRPC_CHANNEL_READY);
// Update state.
absl::optional<grpc_connectivity_state> old_state = connectivity_state_;
connectivity_state_ = new_state;
@ -935,7 +936,7 @@ void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() {
if (connectivity_state_ == GRPC_CHANNEL_IDLE) {
subchannel_state_->RequestConnection();
} else {
CHECK(connectivity_state_ == GRPC_CHANNEL_CONNECTING);
CHECK_EQ(connectivity_state_.value(), GRPC_CHANNEL_CONNECTING);
}
// If this is not the last subchannel in the list, start the timer.
if (index_ != subchannel_list_->size() - 1) {

@ -60,6 +60,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/security/credentials/credentials.h"
@ -92,6 +93,9 @@ namespace testing {
class LoadBalancingPolicyTest : public ::testing::Test {
protected:
using FuzzingEventEngine =
grpc_event_engine::experimental::FuzzingEventEngine;
using CallAttributes =
std::vector<ServiceConfigCallData::CallAttributeInterface*>;
@ -573,7 +577,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
MutexLock lock(&mu_);
StateUpdate update{
state, status,
MakeRefCounted<PickerWrapper>(test_, std::move(picker))};
IsWorkSerializerDispatchEnabled()
? std::move(picker)
: MakeRefCounted<PickerWrapper>(test_, std::move(picker))};
LOG(INFO) << "enqueuing state update from LB policy: "
<< update.ToString();
queue_.push_back(std::move(update));
@ -698,10 +704,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Order is important here: Fuzzing EE needs to be created before
// grpc_init(), and the POSIX EE (which is used by the WorkSerializer)
// needs to be created after grpc_init().
fuzzing_ee_ =
std::make_shared<grpc_event_engine::experimental::FuzzingEventEngine>(
grpc_event_engine::experimental::FuzzingEventEngine::Options(),
fuzzing_event_engine::Actions());
fuzzing_ee_ = MakeFuzzingEventEngine();
grpc_init();
event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine();
work_serializer_ = std::make_shared<WorkSerializer>(event_engine_);
@ -723,14 +726,16 @@ class LoadBalancingPolicyTest : public ::testing::Test {
WaitForWorkSerializerToFlush();
work_serializer_.reset();
exec_ctx.Flush();
// Note: Can't safely trigger this from inside the FakeHelper dtor,
// because if there is a picker in the queue that is holding a ref
// to the LB policy, that will prevent the LB policy from being
// destroyed, and therefore the FakeHelper will not be destroyed.
// (This will cause an ASAN failure, but it will not display the
// queued events, so the failure will be harder to diagnose.)
helper_->ExpectQueueEmpty();
lb_policy_.reset();
if (lb_policy_ != nullptr) {
// Note: Can't safely trigger this from inside the FakeHelper dtor,
// because if there is a picker in the queue that is holding a ref
// to the LB policy, that will prevent the LB policy from being
// destroyed, and therefore the FakeHelper will not be destroyed.
// (This will cause an ASAN failure, but it will not display the
// queued events, so the failure will be harder to diagnose.)
helper_->ExpectQueueEmpty();
lb_policy_.reset();
}
fuzzing_ee_->TickUntilIdle();
grpc_event_engine::experimental::WaitForSingleOwner(
std::move(event_engine_));
@ -739,6 +744,12 @@ class LoadBalancingPolicyTest : public ::testing::Test {
fuzzing_ee_.reset();
}
virtual std::shared_ptr<FuzzingEventEngine> MakeFuzzingEventEngine() {
return std::make_shared<FuzzingEventEngine>(
grpc_event_engine::experimental::FuzzingEventEngine::Options(),
fuzzing_event_engine::Actions());
}
LoadBalancingPolicy* lb_policy() const {
CHECK(lb_policy_ != nullptr);
return lb_policy_.get();
@ -1465,8 +1476,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
}
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
fuzzing_ee_;
std::shared_ptr<FuzzingEventEngine> fuzzing_ee_;
// TODO(ctiller): this is a normal event engine, yet it gets its time measure
// from fuzzing_ee_ -- results are likely to be a little funky, but seem to do
// well enough for the tests we have today.

@ -75,7 +75,7 @@ class PickFirstTest : public LoadBalancingPolicyTest {
}
// Gets order the addresses are being picked. Return type is void so
// assertions can be used
// assertions can be used.
void GetOrderAddressesArePicked(
absl::Span<const absl::string_view> addresses,
std::vector<absl::string_view>* out_address_order) {
@ -1172,6 +1172,72 @@ TEST_F(PickFirstTest, AddressUpdateRetainsSelectedAddress) {
EXPECT_FALSE(subchannel2->ConnectionRequested());
}
// DO NOT USE!
//
// A test class that overrides the FuzzingEventEngine to make timer
// cancellation always fail. This is used to simulate cases where, at
// the moment that the timer is cancelled, the timer has already fired
// but the timer callback has not yet run in the WorkSerializer.
//
// TODO(roth): This is a really ugly hack. As part of changing these
// tests to use the FuzzingEventEngine exclusively, we should instead
// find a way to tick the FuzzingEventEngine to the right point so that
// we don't need this ugliness.
class PickFirstNoCancelTimerTest : public PickFirstTest {
protected:
class FuzzingEventEngineWithoutTimerCancellation : public FuzzingEventEngine {
public:
using FuzzingEventEngine::FuzzingEventEngine;
bool Cancel(TaskHandle) override { return false; }
};
std::shared_ptr<FuzzingEventEngine> MakeFuzzingEventEngine() override {
return std::make_shared<FuzzingEventEngineWithoutTimerCancellation>(
grpc_event_engine::experimental::FuzzingEventEngine::Options(),
fuzzing_event_engine::Actions());
}
};
// This exercizes a bug seen in the wild that caused a crash. For
// details, see https://github.com/grpc/grpc/pull/38144.
TEST_F(PickFirstNoCancelTimerTest, SubchannelNotificationAfterShutdown) {
// Send an update containing one address.
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for each address.
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
auto* subchannel2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel2, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (IDLE), it will request a connection.
EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// Now shut down the LB policy.
// This will cancel the Happy Eyeballs timer, but since we're using a
// FuzzingEventEngine that fails timer cancellations, it simulates the
// case where the timer has already fired but the timer callback has
// not yet run inside the WorkSerializer.
lb_policy_.reset();
// Now the subchannel reports READY. Before the bug fix, this caused
// us to select the subchannel instead of ignoring the notification.
// With the bug fix, this update should never actually be delivered to
// the LB policy, since it will have already shut down the subchannel.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// Now trigger the Happy Eyeballs timer to fire.
IncrementTimeBy(Duration::Milliseconds(250));
// Now the subchannel reports IDLE. Before the bug fix, this
// triggered a crash.
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
}
TEST_F(PickFirstTest, WithShuffle) {
constexpr std::array<absl::string_view, 6> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",

Loading…
Cancel
Save