StatsPluginEnd2EndTest: Add QueueOnceLoadBalancingPolicy to remove flakiness from test (#32019)

* StatsPluginEnd2EndTest: Add QueueOnceLoadBalancingPolicy to remove flakiness from test

* Unused arg

* Remove unnecessary DelegatingPicker

* Reviewer comments

* explicit constructor

* Remove unnecessary dependency

* Reviewer comments

* Sanity

* clang tidy
pull/32037/head
Yash Tibrewal 2 years ago committed by GitHub
parent 9ff89e81c4
commit d3d4dd7325
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 112
      test/core/util/test_lb_policies.cc
  2. 3
      test/core/util/test_lb_policies.h
  3. 1
      test/cpp/ext/filters/census/BUILD
  4. 18
      test/cpp/ext/filters/census/stats_plugin_end2end_test.cc

@ -746,6 +746,113 @@ class FailLbFactory : public LoadBalancingPolicyFactory {
std::atomic<int>* pick_counter_;
};
//
// QueueOnceLoadBalancingPolicy - a load balancing policy that provides a Queue
// PickResult at least once, after which it delegates to PickFirst.
//
constexpr char kQueueOncePolicyName[] = "queue_once";
class QueueOnceLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
public:
explicit QueueOnceLoadBalancingPolicy(Args args)
: ForwardingLoadBalancingPolicy(
std::make_unique<Helper>(
RefCountedPtr<QueueOnceLoadBalancingPolicy>(this)),
std::move(args), "pick_first",
/*initial_refcount=*/2) {}
// We use the standard QueuePicker which invokes ExitIdleLocked() on the first
// pick.
void ExitIdleLocked() override {
bool needs_update = !std::exchange(seen_pick_queued_, true);
if (needs_update) {
channel_control_helper()->UpdateState(state_to_update_.state,
state_to_update_.status,
std::move(state_to_update_.picker));
}
}
absl::string_view name() const override { return kQueueOncePolicyName; }
private:
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<QueueOnceLoadBalancingPolicy> parent)
: parent_(std::move(parent)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override {
return parent_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) override {
// If we've already seen a queued pick, just propagate the update
// directly.
if (parent_->seen_pick_queued_) {
parent_->channel_control_helper()->UpdateState(state, status,
std::move(picker));
return;
}
// Otherwise, store the update in the LB policy, to be propagated later,
// and return a queueing picker.
parent_->state_to_update_ = {state, status, std::move(picker)};
parent_->channel_control_helper()->UpdateState(
state, status, MakeRefCounted<QueuePicker>(parent_->Ref()));
}
void RequestReresolution() override {
parent_->channel_control_helper()->RequestReresolution();
}
absl::string_view GetAuthority() override {
return parent_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return parent_->channel_control_helper()->GetEventEngine();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
private:
RefCountedPtr<QueueOnceLoadBalancingPolicy> parent_;
};
struct StateToUpdate {
grpc_connectivity_state state;
absl::Status status;
RefCountedPtr<SubchannelPicker> picker;
};
StateToUpdate state_to_update_;
bool seen_pick_queued_ = false; // Has a pick been queued yet. Only accessed
// from within the WorkSerializer.
};
class QueueOnceLbConfig : public LoadBalancingPolicy::Config {
public:
absl::string_view name() const override { return kQueueOncePolicyName; }
};
class QueueOnceLoadBalancingPolicyFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<QueueOnceLoadBalancingPolicy>(std::move(args));
}
absl::string_view name() const override { return kQueueOncePolicyName; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& /*json*/) const override {
return MakeRefCounted<QueueOnceLbConfig>();
}
};
} // namespace
void RegisterTestPickArgsLoadBalancingPolicy(
@ -788,4 +895,9 @@ void RegisterFailLoadBalancingPolicy(CoreConfiguration::Builder* builder,
std::make_unique<FailLbFactory>(std::move(status), pick_counter));
}
void RegisterQueueOnceLoadBalancingPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<QueueOnceLoadBalancingPolicyFactory>());
}
} // namespace grpc_core

@ -91,6 +91,9 @@ void RegisterFailLoadBalancingPolicy(CoreConfiguration::Builder* builder,
absl::Status status,
std::atomic<int>* pick_counter = nullptr);
// Registers an LB policy called "queue_once" that queues at least one pick, and
// then delegates to PickFirst.
void RegisterQueueOnceLoadBalancingPolicy(CoreConfiguration::Builder* builder);
} // namespace grpc_core
#endif // GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H

@ -38,6 +38,7 @@ grpc_cc_test(
"//:grpc_opencensus_plugin",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util",
"//test/core/util:test_lb_policies",
"//test/cpp/end2end:test_service_impl",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",

@ -35,11 +35,14 @@
#include <grpcpp/opencensus.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/cpp/ext/filters/census/context.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/open_census_call_tracer.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/test_config.h"
#include "test/core/util/test_lb_policies.h"
#include "test/cpp/end2end/test_service_impl.h"
namespace opencensus {
@ -144,6 +147,11 @@ class ExportedTracesRecorder
class StatsPluginEnd2EndTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
grpc_core::CoreConfiguration::Reset();
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) {
grpc_core::RegisterQueueOnceLoadBalancingPolicy(builder);
});
RegisterOpenCensusPlugin();
// OpenCensus C++ has no API to unregister a previously-registered handler,
// therefore we register this handler once, and enable/disable recording in
@ -152,6 +160,11 @@ class StatsPluginEnd2EndTest : public ::testing::Test {
absl::WrapUnique(traces_recorder_));
}
static void TearDownTestSuite() {
grpc_shutdown();
grpc_core::CoreConfiguration::Reset();
}
void SetUp() override {
// Set up a synchronous server on a different thread to avoid the asynch
// interface.
@ -760,7 +773,10 @@ TEST_F(StatsPluginEnd2EndTest,
TestRemovePendingResolverResultAndPendingLbPickQueueAnnotations) {
{
// Client spans are ended when the ClientContext's destructor is invoked.
auto channel = CreateChannel(server_address_, InsecureChannelCredentials());
ChannelArguments args;
args.SetLoadBalancingPolicyName("queue_once");
auto channel = CreateCustomChannel(server_address_,
InsecureChannelCredentials(), args);
ResetStub(channel);
EchoRequest request;
request.set_message("foo");

Loading…
Cancel
Save