From d3d4dd7325919c0ecb09a76ee8eca4004ce46e69 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 5 Jan 2023 12:45:52 -0800 Subject: [PATCH] StatsPluginEnd2EndTest: Add QueueOnceLoadBalancingPolicy to remove flakiness from test (#32019) * StatsPluginEnd2EndTest: Add QueueOnceLoadBalancingPolicy to remove flakiness from test * Unused arg * Remove unnecessary DelegatingPicker * Reviewer comments * explicit constructor * Remove unnecessary dependency * Reviewer comments * Sanity * clang tidy --- test/core/util/test_lb_policies.cc | 112 ++++++++++++++++++ test/core/util/test_lb_policies.h | 3 + test/cpp/ext/filters/census/BUILD | 1 + .../census/stats_plugin_end2end_test.cc | 18 ++- 4 files changed, 133 insertions(+), 1 deletion(-) diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index 40ca272b02d..c3b30fc9785 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -746,6 +746,113 @@ class FailLbFactory : public LoadBalancingPolicyFactory { std::atomic* pick_counter_; }; +// +// QueueOnceLoadBalancingPolicy - a load balancing policy that provides a Queue +// PickResult at least once, after which it delegates to PickFirst. +// + +constexpr char kQueueOncePolicyName[] = "queue_once"; + +class QueueOnceLoadBalancingPolicy : public ForwardingLoadBalancingPolicy { + public: + explicit QueueOnceLoadBalancingPolicy(Args args) + : ForwardingLoadBalancingPolicy( + std::make_unique( + RefCountedPtr(this)), + std::move(args), "pick_first", + /*initial_refcount=*/2) {} + + // We use the standard QueuePicker which invokes ExitIdleLocked() on the first + // pick. + void ExitIdleLocked() override { + bool needs_update = !std::exchange(seen_pick_queued_, true); + if (needs_update) { + channel_control_helper()->UpdateState(state_to_update_.state, + state_to_update_.status, + std::move(state_to_update_.picker)); + } + } + + absl::string_view name() const override { return kQueueOncePolicyName; } + + private: + class Helper : public ChannelControlHelper { + public: + explicit Helper(RefCountedPtr parent) + : parent_(std::move(parent)) {} + + RefCountedPtr CreateSubchannel( + ServerAddress address, const ChannelArgs& args) override { + return parent_->channel_control_helper()->CreateSubchannel( + std::move(address), args); + } + + void UpdateState(grpc_connectivity_state state, const absl::Status& status, + RefCountedPtr picker) override { + // If we've already seen a queued pick, just propagate the update + // directly. + if (parent_->seen_pick_queued_) { + parent_->channel_control_helper()->UpdateState(state, status, + std::move(picker)); + return; + } + // Otherwise, store the update in the LB policy, to be propagated later, + // and return a queueing picker. + parent_->state_to_update_ = {state, status, std::move(picker)}; + parent_->channel_control_helper()->UpdateState( + state, status, MakeRefCounted(parent_->Ref())); + } + + void RequestReresolution() override { + parent_->channel_control_helper()->RequestReresolution(); + } + + absl::string_view GetAuthority() override { + return parent_->channel_control_helper()->GetAuthority(); + } + + grpc_event_engine::experimental::EventEngine* GetEventEngine() override { + return parent_->channel_control_helper()->GetEventEngine(); + } + + void AddTraceEvent(TraceSeverity severity, + absl::string_view message) override { + parent_->channel_control_helper()->AddTraceEvent(severity, message); + } + + private: + RefCountedPtr parent_; + }; + struct StateToUpdate { + grpc_connectivity_state state; + absl::Status status; + RefCountedPtr picker; + }; + StateToUpdate state_to_update_; + bool seen_pick_queued_ = false; // Has a pick been queued yet. Only accessed + // from within the WorkSerializer. +}; + +class QueueOnceLbConfig : public LoadBalancingPolicy::Config { + public: + absl::string_view name() const override { return kQueueOncePolicyName; } +}; + +class QueueOnceLoadBalancingPolicyFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr CreateLoadBalancingPolicy( + LoadBalancingPolicy::Args args) const override { + return MakeOrphanable(std::move(args)); + } + + absl::string_view name() const override { return kQueueOncePolicyName; } + + absl::StatusOr> + ParseLoadBalancingConfig(const Json& /*json*/) const override { + return MakeRefCounted(); + } +}; + } // namespace void RegisterTestPickArgsLoadBalancingPolicy( @@ -788,4 +895,9 @@ void RegisterFailLoadBalancingPolicy(CoreConfiguration::Builder* builder, std::make_unique(std::move(status), pick_counter)); } +void RegisterQueueOnceLoadBalancingPolicy(CoreConfiguration::Builder* builder) { + builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory( + std::make_unique()); +} + } // namespace grpc_core diff --git a/test/core/util/test_lb_policies.h b/test/core/util/test_lb_policies.h index afec49b29df..9298b946992 100644 --- a/test/core/util/test_lb_policies.h +++ b/test/core/util/test_lb_policies.h @@ -91,6 +91,9 @@ void RegisterFailLoadBalancingPolicy(CoreConfiguration::Builder* builder, absl::Status status, std::atomic* pick_counter = nullptr); +// Registers an LB policy called "queue_once" that queues at least one pick, and +// then delegates to PickFirst. +void RegisterQueueOnceLoadBalancingPolicy(CoreConfiguration::Builder* builder); } // namespace grpc_core #endif // GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H diff --git a/test/cpp/ext/filters/census/BUILD b/test/cpp/ext/filters/census/BUILD index 815e7ad41f6..91220b1e3ac 100644 --- a/test/cpp/ext/filters/census/BUILD +++ b/test/cpp/ext/filters/census/BUILD @@ -38,6 +38,7 @@ grpc_cc_test( "//:grpc_opencensus_plugin", "//src/proto/grpc/testing:echo_proto", "//test/core/util:grpc_test_util", + "//test/core/util:test_lb_policies", "//test/cpp/end2end:test_service_impl", "//test/cpp/util:test_config", "//test/cpp/util:test_util", diff --git a/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc index 2c5683eefc4..22877a8a894 100644 --- a/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc +++ b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc @@ -35,11 +35,14 @@ #include #include "src/core/lib/channel/call_tracer.h" +#include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/load_balancing/lb_policy.h" #include "src/cpp/ext/filters/census/context.h" #include "src/cpp/ext/filters/census/grpc_plugin.h" #include "src/cpp/ext/filters/census/open_census_call_tracer.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/test_config.h" +#include "test/core/util/test_lb_policies.h" #include "test/cpp/end2end/test_service_impl.h" namespace opencensus { @@ -144,6 +147,11 @@ class ExportedTracesRecorder class StatsPluginEnd2EndTest : public ::testing::Test { protected: static void SetUpTestSuite() { + grpc_core::CoreConfiguration::Reset(); + grpc_core::CoreConfiguration::RegisterBuilder( + [](grpc_core::CoreConfiguration::Builder* builder) { + grpc_core::RegisterQueueOnceLoadBalancingPolicy(builder); + }); RegisterOpenCensusPlugin(); // OpenCensus C++ has no API to unregister a previously-registered handler, // therefore we register this handler once, and enable/disable recording in @@ -152,6 +160,11 @@ class StatsPluginEnd2EndTest : public ::testing::Test { absl::WrapUnique(traces_recorder_)); } + static void TearDownTestSuite() { + grpc_shutdown(); + grpc_core::CoreConfiguration::Reset(); + } + void SetUp() override { // Set up a synchronous server on a different thread to avoid the asynch // interface. @@ -760,7 +773,10 @@ TEST_F(StatsPluginEnd2EndTest, TestRemovePendingResolverResultAndPendingLbPickQueueAnnotations) { { // Client spans are ended when the ClientContext's destructor is invoked. - auto channel = CreateChannel(server_address_, InsecureChannelCredentials()); + ChannelArguments args; + args.SetLoadBalancingPolicyName("queue_once"); + auto channel = CreateCustomChannel(server_address_, + InsecureChannelCredentials(), args); ResetStub(channel); EchoRequest request; request.set_message("foo");