priority LB: fix dumb reversed-conditional bug (#30149)

* C++ end2end tests: refactor ConnectionAttemptInjector code

* priority LB: fix dumb reversed-conditional bug

* clang-format

* add test
pull/30170/head
Mark D. Roth 3 years ago committed by GitHub
parent 0ec9ca254e
commit 74e1023f0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CMakeLists.txt
  2. 2
      build_autogenerated.yaml
  3. 4
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  4. 1
      test/cpp/end2end/xds/BUILD
  5. 42
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  6. 111
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

1
CMakeLists.txt generated

@ -18330,6 +18330,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h
test/cpp/end2end/connection_delay_injector.cc
test/cpp/end2end/test_service_impl.cc
test/cpp/end2end/xds/xds_cluster_end2end_test.cc
test/cpp/end2end/xds/xds_end2end_test_lib.cc

@ -9436,6 +9436,7 @@ targets:
run: false
language: c++
headers:
- test/cpp/end2end/connection_delay_injector.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
- test/cpp/end2end/xds/xds_end2end_test_lib.h
@ -9475,6 +9476,7 @@ targets:
- src/proto/grpc/testing/xds/v3/route.proto
- src/proto/grpc/testing/xds/v3/router.proto
- src/proto/grpc/testing/xds/v3/string.proto
- test/cpp/end2end/connection_delay_injector.cc
- test/cpp/end2end/test_service_impl.cc
- test/cpp/end2end/xds/xds_cluster_end2end_test.cc
- test/cpp/end2end/xds/xds_end2end_test_lib.cc

@ -490,7 +490,7 @@ void PriorityLb::ChoosePriorityLocked() {
// immediately report CONNECTING and cause us to report that state
// anyway, but we do this just in case the child fails to report
// state before UpdateLocked() returns.
if (current_child_from_before_update_ != nullptr) {
if (current_child_from_before_update_ == nullptr) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
@ -524,7 +524,7 @@ void PriorityLb::ChoosePriorityLocked() {
current_priority_ = priority;
// If we're not still using an old child from before the last
// update, report CONNECTING here.
if (current_child_from_before_update_ != nullptr) {
if (current_child_from_before_update_ == nullptr) {
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());

@ -166,6 +166,7 @@ grpc_cc_test(
"//:grpc",
"//:grpc++",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:connection_delay_injector",
],
)

@ -24,6 +24,8 @@
#include "absl/strings/str_cat.h"
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "test/cpp/end2end/connection_delay_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
namespace grpc {
@ -538,10 +540,14 @@ TEST_P(EdsTest, BackendsRestart) {
WaitForAllBackends(DEBUG_LOCATION);
// Stop backends. RPCs should fail.
ShutdownAllBackends();
// Wait for channel to report TRANSIENT_FAILURE.
// Wait for channel to transition out of READY, so that we know it has
// noticed that all of the subchannels have failed. Note that it may
// be reporting either CONNECTING or TRANSIENT_FAILURE at this point.
EXPECT_TRUE(channel_->WaitForStateChange(
GRPC_CHANNEL_READY, grpc_timeout_seconds_to_deadline(5)));
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
EXPECT_THAT(channel_->GetState(false),
::testing::AnyOf(::testing::Eq(GRPC_CHANNEL_TRANSIENT_FAILURE),
::testing::Eq(GRPC_CHANNEL_CONNECTING)));
// RPCs should fail.
CheckRpcSendFailure(
DEBUG_LOCATION, StatusCode::UNAVAILABLE,
@ -1120,6 +1126,37 @@ TEST_P(FailoverTest, Failover) {
EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
}
// Reports CONNECTING when failing over to a lower priority.
TEST_P(FailoverTest, ReportsConnectingDuringFailover) {
CreateAndStartBackends(1);
// Priority 0 will be unreachable, so we'll use priority 1.
EdsResourceArgs args({
{"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
{"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
ConnectionHoldInjector injector;
injector.Start();
auto hold = injector.AddHold(backends_[0]->port());
// Start an RPC in the background, which should cause the channel to
// try to connect.
LongRunningRpc rpc;
rpc.StartRpc(stub_.get(), RpcOptions());
// Wait for connection attempt to start to the backend.
hold->Wait();
// Channel state should be CONNECTING here, and any RPC should be
// queued.
EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
// Allow the connection attempt to complete.
hold->Resume();
// Now the RPC should complete successfully.
gpr_log(GPR_INFO, "=== WAITING FOR RPC TO FINISH ===");
Status status = rpc.GetStatus();
gpr_log(GPR_INFO, "=== RPC FINISHED ===");
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
}
// If a locality with higher priority than the current one becomes ready,
// switch to it.
TEST_P(FailoverTest, SwitchBackToHigherPriority) {
@ -1672,6 +1709,7 @@ int main(int argc, char** argv) {
gpr_setenv("grpc_cfstream", "0");
#endif
grpc_init();
grpc::testing::ConnectionAttemptInjector::Init();
const auto result = RUN_ALL_TESTS();
grpc_shutdown();
return result;

@ -46,12 +46,17 @@ class RingHashTest : public XdsEnd2endTest {
logical_dns_cluster_resolver_response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
InitClient();
ChannelArguments args;
args.SetPointerWithVtable(
SetUpChannel();
}
void SetUpChannel(ChannelArguments* args = nullptr) {
ChannelArguments local_args;
if (args == nullptr) args = &local_args;
args->SetPointerWithVtable(
GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR,
logical_dns_cluster_resolver_response_generator_.get(),
&grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable);
ResetStub(/*failover_timeout_ms=*/0, &args);
ResetStub(/*failover_timeout_ms=*/0, args);
}
grpc_core::ServerAddressList CreateAddressListFromPortList(
@ -214,6 +219,106 @@ TEST_P(RingHashTest,
CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
}
TEST_P(RingHashTest,
AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRpcs) {
CreateAndStartBackends(1);
const char* kEdsClusterName = "eds_cluster";
const char* kLogicalDNSClusterName = "logical_dns_cluster";
// Populate EDS resource.
EdsResourceArgs args({
{"locality0",
{MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
kDefaultLocalityWeight,
0},
{"locality1",
{MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
kDefaultLocalityWeight,
1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Populate new CDS resources.
Cluster eds_cluster = default_cluster_;
eds_cluster.set_name(kEdsClusterName);
balancer_->ads_service()->SetCdsResource(eds_cluster);
// Populate LOGICAL_DNS cluster.
auto logical_dns_cluster = default_cluster_;
logical_dns_cluster.set_name(kLogicalDNSClusterName);
logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
auto* address = logical_dns_cluster.mutable_load_assignment()
->add_endpoints()
->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
address->set_address(kServerName);
address->set_port_value(443);
balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
// Create Aggregate Cluster
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kEdsClusterName);
cluster_config.add_clusters(kLogicalDNSClusterName);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// Set up route with channel id hashing
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Set Logical DNS result
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts());
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Set up connection attempt injector.
ConnectionHoldInjector injector;
injector.Start();
auto hold = injector.AddHold(backends_[0]->port());
// Increase subchannel backoff time, so that subchannels stay in
// TRANSIENT_FAILURE for long enough to trigger potential problems.
ChannelArguments channel_args;
channel_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 10000);
SetUpChannel(&channel_args);
// Start an RPC in the background.
LongRunningRpc rpc;
rpc.StartRpc(stub_.get(), RpcOptions());
// Wait for connection attempt to the backend.
hold->Wait();
// Channel should report CONNECTING here, and any RPC should be queued.
EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
// Start a second RPC at this point, which should be queued as well.
// This will fail if the priority policy fails to update the picker to
// point to the LOGICAL_DNS child; if it leaves it pointing to the EDS
// priority 1, then the RPC will fail, because all subchannels are in
// TRANSIENT_FAILURE.
// Note that sending only the first RPC does not catch this case,
// because if the priority policy fails to update the picker, then the
// pick for the first RPC will not be retried.
LongRunningRpc rpc2;
rpc2.StartRpc(stub_.get(), RpcOptions());
// Allow the connection attempt to complete.
hold->Resume();
// Now the RPCs should complete successfully.
gpr_log(GPR_INFO, "=== WAITING FOR FIRST RPC TO FINISH ===");
Status status = rpc.GetStatus();
gpr_log(GPR_INFO, "=== FIRST RPC FINISHED ===");
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
gpr_log(GPR_INFO, "=== WAITING FOR SECOND RPC TO FINISH ===");
status = rpc2.GetStatus();
gpr_log(GPR_INFO, "=== SECOND RPC FINISHED ===");
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
}
// Tests that ring hash policy that hashes using channel id ensures all RPCs
// to go 1 particular backend.
TEST_P(RingHashTest, ChannelIdHashing) {

Loading…
Cancel
Save