xds_end2end_test: Move ring_hash tests to their own file (#29287)

* move some code around

* remove num_backends parameter from XdsEnd2endTest

* remove use_xds_enabled_server param from XdsEnd2endTest

* remove xds_resource_does_not_exist_timeout_ms param from XdsEnd2endTest

* remove client_load_reporting_interval_seconds param from XdsEnd2endTest

* start moving CreateAndStartBackends() into individual tests

* finish moving CreateAndStartBackends() into individual tests

* remove unused variable

* remove SetEdsResourceWithDelay

* fix test flake

* clang-tidy

* clang-format

* move test framework to its own library

* fix build

* clang-format

* fix windows build

* rename TestType to XdsTestType

* move BackendServiceImpl inside of BackendServerThread

* clang-format

* move AdminServerThread to CSDS test suite

* move ring_hash tests to their own file

* generate_projects

* remove unnecessary deps

* re-add flaky tag

* clang-format
pull/29298/head
Mark D. Roth 3 years ago committed by GitHub
parent b9d904da58
commit bea5911569
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 173
      CMakeLists.txt
  2. 57
      build_autogenerated.yaml
  3. 29
      test/cpp/end2end/xds/BUILD
  4. 889
      test/cpp/end2end/xds/xds_end2end_test.cc
  5. 985
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

173
CMakeLists.txt generated

@ -1194,6 +1194,9 @@ if(gRPC_BUILD_TESTS)
endif()
add_dependencies(buildtests_cxx xds_interop_client)
add_dependencies(buildtests_cxx xds_interop_server)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_ring_hash_end2end_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_rls_end2end_test)
endif()
@ -18044,6 +18047,176 @@ target_link_libraries(xds_interop_server
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(xds_ring_hash_end2end_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h
test/cpp/end2end/connection_delay_injector.cc
test/cpp/end2end/test_service_impl.cc
test/cpp/end2end/xds/xds_end2end_test_lib.cc
test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc
test/cpp/end2end/xds/xds_server.cc
test/cpp/util/tls_test_utils.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(xds_ring_hash_end2end_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(xds_ring_hash_end2end_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

@ -8679,6 +8679,63 @@ targets:
- grpcpp_channelz
- grpc_test_util
- grpc++_test_config
- name: xds_ring_hash_end2end_test
gtest: true
build: test
run: false
language: c++
headers:
- test/cpp/end2end/connection_delay_injector.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
- test/cpp/end2end/xds/xds_end2end_test_lib.h
- test/cpp/end2end/xds/xds_server.h
- test/cpp/util/tls_test_utils.h
src:
- src/proto/grpc/testing/duplicate/echo_duplicate.proto
- src/proto/grpc/testing/echo.proto
- src/proto/grpc/testing/echo_messages.proto
- src/proto/grpc/testing/simple_messages.proto
- src/proto/grpc/testing/xds/ads_for_test.proto
- src/proto/grpc/testing/xds/eds_for_test.proto
- src/proto/grpc/testing/xds/lrs_for_test.proto
- src/proto/grpc/testing/xds/v3/address.proto
- src/proto/grpc/testing/xds/v3/ads.proto
- src/proto/grpc/testing/xds/v3/aggregate_cluster.proto
- src/proto/grpc/testing/xds/v3/base.proto
- src/proto/grpc/testing/xds/v3/cluster.proto
- src/proto/grpc/testing/xds/v3/config_source.proto
- src/proto/grpc/testing/xds/v3/discovery.proto
- src/proto/grpc/testing/xds/v3/endpoint.proto
- src/proto/grpc/testing/xds/v3/expr.proto
- src/proto/grpc/testing/xds/v3/extension.proto
- src/proto/grpc/testing/xds/v3/http_connection_manager.proto
- src/proto/grpc/testing/xds/v3/http_filter_rbac.proto
- src/proto/grpc/testing/xds/v3/listener.proto
- src/proto/grpc/testing/xds/v3/load_report.proto
- src/proto/grpc/testing/xds/v3/lrs.proto
- src/proto/grpc/testing/xds/v3/metadata.proto
- src/proto/grpc/testing/xds/v3/path.proto
- src/proto/grpc/testing/xds/v3/percent.proto
- src/proto/grpc/testing/xds/v3/protocol.proto
- src/proto/grpc/testing/xds/v3/range.proto
- src/proto/grpc/testing/xds/v3/rbac.proto
- src/proto/grpc/testing/xds/v3/regex.proto
- src/proto/grpc/testing/xds/v3/route.proto
- src/proto/grpc/testing/xds/v3/router.proto
- src/proto/grpc/testing/xds/v3/string.proto
- test/cpp/end2end/connection_delay_injector.cc
- test/cpp/end2end/test_service_impl.cc
- test/cpp/end2end/xds/xds_end2end_test_lib.cc
- test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc
- test/cpp/end2end/xds/xds_server.cc
- test/cpp/util/tls_test_utils.cc
deps:
- grpc++_test_util
platforms:
- linux
- posix
- mac
- name: xds_rls_end2end_test
gtest: true
build: test

@ -219,6 +219,35 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "xds_ring_hash_end2end_test",
size = "large",
srcs = ["xds_ring_hash_end2end_test.cc"],
external_deps = [
"gtest",
],
flaky = True, # TODO(b/144705388)
linkstatic = True, # Fixes dyld error on MacOS
tags = [
"no_test_ios",
"no_windows",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
"//:gpr",
"//:grpc",
"//:grpc++",
"//:grpc_resolver_fake",
"//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto",
"//src/proto/grpc/testing/xds/v3:cluster_proto",
"//src/proto/grpc/testing/xds/v3:endpoint_proto",
"//src/proto/grpc/testing/xds/v3:listener_proto",
"//src/proto/grpc/testing/xds/v3:route_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:connection_delay_injector",
],
)
grpc_cc_test(
name = "xds_rls_end2end_test",
size = "large",

@ -652,14 +652,6 @@ class CdsTest : public XdsEnd2endTest {
return addresses;
}
std::string CreateMetadataValueThatHashesToBackendPort(int port) {
return absl::StrCat(ipv6_only_ ? "[::1]" : "127.0.0.1", ":", port, "_0");
}
std::string CreateMetadataValueThatHashesToBackend(int index) {
return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port());
}
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
logical_dns_cluster_resolver_response_generator_;
};
@ -1004,136 +996,6 @@ TEST_P(CdsTest, AggregateClusterDiamondDependency) {
WaitForBackend(0);
}
TEST_P(CdsTest, AggregateClusterFallBackFromRingHashAtStartup) {
ScopedExperimentalEnvVar env_var(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
CreateAndStartBackends(2);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewEdsService1Name = "new_eds_service_name_1";
const char* kNewCluster2Name = "new_cluster_2";
const char* kNewEdsService2Name = "new_eds_service_name_2";
// Populate new EDS resources.
EdsResourceArgs args1({
{"locality0", {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}},
});
EdsResourceArgs args2({
{"locality0", CreateEndpointsForBackends()},
});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args1, kNewEdsService1Name));
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args2, kNewEdsService2Name));
// Populate new CDS resources.
Cluster new_cluster1 = default_cluster_;
new_cluster1.set_name(kNewCluster1Name);
new_cluster1.mutable_eds_cluster_config()->set_service_name(
kNewEdsService1Name);
balancer_->ads_service()->SetCdsResource(new_cluster1);
Cluster new_cluster2 = default_cluster_;
new_cluster2.set_name(kNewCluster2Name);
new_cluster2.mutable_eds_cluster_config()->set_service_name(
kNewEdsService2Name);
balancer_->ads_service()->SetCdsResource(new_cluster2);
// Create Aggregate Cluster
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kNewCluster1Name);
cluster_config.add_clusters(kNewCluster2Name);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// Set up route with channel id hashing
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Verifying that we are using ring hash as only 1 endpoint is receiving all
// the traffic.
CheckRpcSendOk(100);
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
}
TEST_P(CdsTest, AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) {
ScopedExperimentalEnvVar env_var(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
CreateAndStartBackends(1);
const char* kEdsClusterName = "eds_cluster";
const char* kLogicalDNSClusterName = "logical_dns_cluster";
// Populate EDS resource.
EdsResourceArgs args({
{"locality0",
{MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
kDefaultLocalityWeight,
0},
{"locality1",
{MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
kDefaultLocalityWeight,
1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Populate new CDS resources.
Cluster eds_cluster = default_cluster_;
eds_cluster.set_name(kEdsClusterName);
balancer_->ads_service()->SetCdsResource(eds_cluster);
// Populate LOGICAL_DNS cluster.
auto logical_dns_cluster = default_cluster_;
logical_dns_cluster.set_name(kLogicalDNSClusterName);
logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
auto* address = logical_dns_cluster.mutable_load_assignment()
->add_endpoints()
->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
address->set_address(kServerName);
address->set_port_value(443);
balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
// Create Aggregate Cluster
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kEdsClusterName);
cluster_config.add_clusters(kLogicalDNSClusterName);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// Set up route with channel id hashing
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Set Logical DNS result
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts());
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Inject connection delay to make this act more realistically.
ConnectionDelayInjector delay_injector(
grpc_core::Duration::Milliseconds(500) * grpc_test_slowdown_factor());
// Send RPC. Need the timeout to be long enough to account for the
// subchannel connection delays.
CheckRpcSendOk(1, RpcOptions().set_timeout_ms(3500));
}
// This test covers a bug found in the following scenario:
// 1. P0 reports TRANSIENT_FAILURE, so we start connecting to P1.
// 2. While P1 is still in CONNECTING, P0 goes back to READY, so we
@ -1681,343 +1543,9 @@ TEST_P(CdsTest, WrongLrsServer) {
::testing::HasSubstr("LRS ConfigSource is not self."));
}
// Tests that ring hash policy that hashes using channel id ensures all RPCs
// to go 1 particular backend.
TEST_P(CdsTest, RingHashChannelIdHashing) {
CreateAndStartBackends(4);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
CheckRpcSendOk(100);
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
}
// Tests that ring hash policy that hashes using a header value can spread
// RPCs across all the backends.
TEST_P(CdsTest, RingHashHeaderHashing) {
CreateAndStartBackends(4);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
std::vector<std::pair<std::string, std::string>> metadata1 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
std::vector<std::pair<std::string, std::string>> metadata2 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
std::vector<std::pair<std::string, std::string>> metadata3 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1));
const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2));
const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3));
WaitForBackend(0, WaitForBackendOptions(), rpc_options);
WaitForBackend(1, WaitForBackendOptions(), rpc_options1);
WaitForBackend(2, WaitForBackendOptions(), rpc_options2);
WaitForBackend(3, WaitForBackendOptions(), rpc_options3);
CheckRpcSendOk(100, rpc_options);
CheckRpcSendOk(100, rpc_options1);
CheckRpcSendOk(100, rpc_options2);
CheckRpcSendOk(100, rpc_options3);
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
}
}
// Tests that ring hash policy that hashes using a header value and regex
// rewrite to aggregate RPCs to 1 backend.
TEST_P(CdsTest, RingHashHeaderHashingWithRegexRewrite) {
CreateAndStartBackends(4);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
hash_policy->mutable_header()
->mutable_regex_rewrite()
->mutable_pattern()
->set_regex("[0-9]+");
hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution(
"foo");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
std::vector<std::pair<std::string, std::string>> metadata1 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
std::vector<std::pair<std::string, std::string>> metadata2 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
std::vector<std::pair<std::string, std::string>> metadata3 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1));
const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2));
const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3));
CheckRpcSendOk(100, rpc_options);
CheckRpcSendOk(100, rpc_options1);
CheckRpcSendOk(100, rpc_options2);
CheckRpcSendOk(100, rpc_options3);
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
}
// Tests that ring hash policy that hashes using a random value.
TEST_P(CdsTest, RingHashNoHashPolicy) {
CreateAndStartBackends(2);
const double kDistribution50Percent = 0.5;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(0, 2, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(kNumRpcs);
const int request_count_1 = backends_[0]->backend_service()->request_count();
const int request_count_2 = backends_[1]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
}
// Test that ring hash policy evaluation will continue past the terminal
// policy if no results are produced yet.
TEST_P(CdsTest, RingHashContinuesPastTerminalPolicyThatDoesNotProduceResult) {
CreateAndStartBackends(2);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("header_not_present");
hash_policy->set_terminal(true);
auto* hash_policy2 = route->mutable_route()->add_hash_policy();
hash_policy2->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
CheckRpcSendOk(100, rpc_options);
EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100);
EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
}
// Test random hash is used when header hashing specified a header field that
// the RPC did not have.
TEST_P(CdsTest, RingHashOnHeaderThatIsNotPresent) {
CreateAndStartBackends(2);
const double kDistribution50Percent = 0.5;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("header_not_present");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"unmatched_header", absl::StrFormat("%" PRIu32, rand())},
};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(0, 2, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(kNumRpcs, rpc_options);
const int request_count_1 = backends_[0]->backend_service()->request_count();
const int request_count_2 = backends_[1]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
}
// Test random hash is used when only unsupported hash policies are
// configured.
TEST_P(CdsTest, RingHashUnsupportedHashPolicyDefaultToRandomHashing) {
CreateAndStartBackends(2);
const double kDistribution50Percent = 0.5;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
true);
auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_3->mutable_query_parameter()->set_name(
"query_parameter");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(0, 2, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(kNumRpcs);
const int request_count_1 = backends_[0]->backend_service()->request_count();
const int request_count_2 = backends_[1]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
}
// Tests that ring hash policy that hashes using a random value can spread
// RPCs across all the backends according to locality weight.
TEST_P(CdsTest, RingHashRandomHashingDistributionAccordingToEndpointWeight) {
CreateAndStartBackends(2);
const size_t kWeight1 = 1;
const size_t kWeight2 = 2;
const size_t kWeightTotal = kWeight1 + kWeight2;
const double kWeight33Percent = static_cast<double>(kWeight1) / kWeightTotal;
const double kWeight66Percent = static_cast<double>(kWeight2) / kWeightTotal;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({{"locality0",
{CreateEndpoint(0, HealthStatus::UNKNOWN, 1),
CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(0, 2, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(kNumRpcs);
const int weight_33_request_count =
backends_[0]->backend_service()->request_count();
const int weight_66_request_count =
backends_[1]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(weight_33_request_count) / kNumRpcs,
::testing::DoubleNear(kWeight33Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(weight_66_request_count) / kNumRpcs,
::testing::DoubleNear(kWeight66Percent, kErrorTolerance));
}
// Tests that ring hash policy that hashes using a random value can spread
// RPCs across all the backends according to locality weight.
TEST_P(CdsTest,
RingHashRandomHashingDistributionAccordingToLocalityAndEndpointWeight) {
CreateAndStartBackends(2);
const size_t kWeight1 = 1 * 1;
const size_t kWeight2 = 2 * 2;
const size_t kWeightTotal = kWeight1 + kWeight2;
const double kWeight20Percent = static_cast<double>(kWeight1) / kWeightTotal;
const double kWeight80Percent = static_cast<double>(kWeight2) / kWeightTotal;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args(
{{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1},
{"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(0, 2, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(kNumRpcs);
const int weight_20_request_count =
backends_[0]->backend_service()->request_count();
const int weight_80_request_count =
backends_[1]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(weight_20_request_count) / kNumRpcs,
::testing::DoubleNear(kWeight20Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(weight_80_request_count) / kNumRpcs,
::testing::DoubleNear(kWeight80Percent, kErrorTolerance));
}
// Tests round robin is not implacted by the endpoint weight, and that the
// localities in a locality map are picked according to their weights.
TEST_P(CdsTest, RingHashEndpointWeightDoesNotImpactWeightedRoundRobin) {
TEST_P(CdsTest, EndpointWeightDoesNotImpactWeightedRoundRobin) {
CreateAndStartBackends(2);
const int kLocalityWeight0 = 2;
const int kLocalityWeight1 = 8;
@ -2056,421 +1584,6 @@ TEST_P(CdsTest, RingHashEndpointWeightDoesNotImpactWeightedRoundRobin) {
::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
}
// Tests that ring hash policy that hashes using a fixed string ensures all
// RPCs to go 1 particular backend; and that subsequent hashing policies are
// ignored due to the setting of terminal.
TEST_P(CdsTest, RingHashFixedHashingTerminalPolicy) {
CreateAndStartBackends(2);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("fixed_string");
hash_policy->set_terminal(true);
auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy();
hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"fixed_string", "fixed_value"},
{"random_string", absl::StrFormat("%" PRIu32, rand())},
};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
CheckRpcSendOk(100, rpc_options);
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
}
// Test that the channel will go from idle to ready via connecting;
// (tho it is not possible to catch the connecting state before moving to
// ready)
TEST_P(CdsTest, RingHashIdleToReady) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
CheckRpcSendOk();
EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
}
// Test that the channel will transition to READY once it starts
// connecting even if there are no RPCs being sent to the picker.
TEST_P(CdsTest, RingHashContinuesConnectingWithoutPicks) {
// Create EDS resource.
CreateAndStartBackends(1);
auto non_existant_endpoint = MakeNonExistantEndpoint();
EdsResourceArgs args(
{{"locality0", {non_existant_endpoint, CreateEndpoint(0)}}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Change CDS resource to use RING_HASH.
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
// Add hash policy to RDS resource.
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// A connection injector that cancels the RPC after seeing the
// connection attempt for the non-existant endpoint.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
explicit ConnectionInjector(int port) : port_(port) {}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
{
grpc_core::MutexLock lock(&mu_);
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): seen_port_=%d, port=%d",
seen_port_, port);
// Initial attempt should be for port0_, which should fail.
// Cancel the RPC at this point, so that it's no longer
// queued when the LB policy updates the picker.
if (!seen_port_ && port == port_) {
gpr_log(GPR_INFO, "*** SEEN P0 CONNECTION ATTEMPT");
seen_port_ = true;
cond_.Signal();
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
void WaitForP0ConnectionAttempt() {
grpc_core::MutexLock lock(&mu_);
while (!seen_port_) {
cond_.Wait(&mu_);
}
}
private:
const int port_;
grpc_core::Mutex mu_;
grpc_core::CondVar cond_;
bool seen_port_ ABSL_GUARDED_BY(mu_) = false;
};
ConnectionInjector connection_injector(non_existant_endpoint.port);
// A long-running RPC, just used to send the RPC in another thread.
LongRunningRpc rpc;
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash",
CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}};
rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
std::move(metadata)));
// Wait for the RPC to trigger the P0 connection attempt, then cancel it.
connection_injector.WaitForP0ConnectionAttempt();
rpc.CancelRpc();
// Wait for channel to become connected without any pending RPC.
EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5)));
// RPC should have been cancelled.
EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
// Make sure the backend did not get any requests.
EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
}
// Test that when the first pick is down leading to a transient failure, we
// will move on to the next ring hash entry.
TEST_P(CdsTest, RingHashTransientFailureCheckNextOne) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
std::vector<EdsResourceArgs::Endpoint> endpoints;
const int unused_port = grpc_pick_unused_port_or_die();
endpoints.emplace_back(unused_port);
endpoints.emplace_back(backends_[0]->port());
EdsResourceArgs args({{"locality0", std::move(endpoints)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash",
CreateMetadataValueThatHashesToBackendPort(unused_port)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
WaitForBackend(0, WaitForBackendOptions(), rpc_options);
CheckRpcSendOk(100, rpc_options);
}
// Test that when a backend goes down, we will move on to the next subchannel
// (with a lower priority). When the backend comes back up, traffic will move
// back.
TEST_P(CdsTest, RingHashSwitchToLowerPrioirtyAndThenBack) {
CreateAndStartBackends(2);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
0},
{"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
WaitForBackend(0, WaitForBackendOptions(), rpc_options);
ShutdownBackend(0);
WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true),
rpc_options);
StartBackend(0);
WaitForBackend(0, WaitForBackendOptions(), rpc_options);
CheckRpcSendOk(100, rpc_options);
EXPECT_EQ(100, backends_[0]->backend_service()->request_count());
EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
}
// Test that when all backends are down, we will keep reattempting.
TEST_P(CdsTest, RingHashAllFailReattempt) {
CreateAndStartBackends(1);
const uint32_t kConnectionTimeoutMilliseconds = 5000;
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args(
{{"locality0", {MakeNonExistantEndpoint(), CreateEndpoint(0)}}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
ShutdownBackend(0);
CheckRpcSendFailure(CheckRpcSendFailureOptions().set_rpc_options(
RpcOptions().set_metadata(std::move(metadata))));
StartBackend(0);
// Ensure we are actively connecting without any traffic.
EXPECT_TRUE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
}
// Test that when all backends are down and then up, we may pick a TF backend
// and we will then jump to ready backend.
TEST_P(CdsTest, RingHashTransientFailureSkipToAvailableReady) {
CreateAndStartBackends(2);
const uint32_t kConnectionTimeoutMilliseconds = 5000;
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Make sure we include some unused ports to fill the ring.
EdsResourceArgs args({
{"locality0",
{CreateEndpoint(0), CreateEndpoint(1), MakeNonExistantEndpoint(),
MakeNonExistantEndpoint()}},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
ShutdownBackend(0);
ShutdownBackend(1);
CheckRpcSendFailure(
CheckRpcSendFailureOptions().set_rpc_options(rpc_options));
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
// Bring up 0, should be picked as the RPC is hashed to it.
StartBackend(0);
EXPECT_TRUE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
WaitForBackend(0, WaitForBackendOptions(), rpc_options);
// Bring down 0 and bring up 1.
// Note the RPC contains a header value that will always be hashed to
// backend 0. So by purposely bring down backend 0 and bring up another
// backend, this will ensure Picker's first choice of backend 0 will fail
// and it will
// 1. reattempt backend 0 and
// 2. go through the remaining subchannels to find one in READY.
// Since the the entries in the ring is pretty distributed and we have
// unused ports to fill the ring, it is almost guaranteed that the Picker
// will go through some non-READY entries and skip them as per design.
ShutdownBackend(0);
CheckRpcSendFailure(
CheckRpcSendFailureOptions().set_rpc_options(rpc_options));
StartBackend(1);
EXPECT_TRUE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
WaitForBackend(1, WaitForBackendOptions(), rpc_options);
}
// Test unspported hash policy types are all ignored before a supported
// policy.
TEST_P(CdsTest, RingHashUnsupportedHashPolicyUntilChannelIdHashing) {
CreateAndStartBackends(2);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
true);
auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_3->mutable_query_parameter()->set_name(
"query_parameter");
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
CheckRpcSendOk(100);
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
}
// Test we nack when ring hash policy has invalid hash function (something
// other than XX_HASH.
TEST_P(CdsTest, RingHashPolicyHasInvalidHashFunction) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
cluster.mutable_ring_hash_lb_config()->set_hash_function(
Cluster::RingHashLbConfig::MURMUR_HASH_2);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForCdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(
response_state->error_message,
::testing::HasSubstr("ring hash lb config has invalid hash function."));
}
// Test we nack when ring hash policy has invalid ring size.
TEST_P(CdsTest, RingHashPolicyHasInvalidMinimumRingSize) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
0);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForCdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr(
"min_ring_size is not in the range of 1 to 8388608."));
}
// Test we nack when ring hash policy has invalid ring size.
TEST_P(CdsTest, RingHashPolicyHasInvalidMaxmumRingSize) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
8388609);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForCdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr(
"max_ring_size is not in the range of 1 to 8388608."));
}
// Test we nack when ring hash policy has invalid ring size.
TEST_P(CdsTest, RingHashPolicyHasInvalidRingSizeMinGreaterThanMax) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
5000);
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
5001);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForCdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr(
"min_ring_size cannot be greater than max_ring_size."));
}
class XdsSecurityTest : public XdsEnd2endTest {
protected:
void SetUp() override {

@ -0,0 +1,985 @@
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/gpr/env.h"
#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
#include "test/cpp/end2end/connection_delay_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
namespace grpc {
namespace testing {
namespace {
using ::envoy::config::cluster::v3::CustomClusterType;
using ::envoy::config::endpoint::v3::HealthStatus;
using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig;
class RingHashTest : public XdsEnd2endTest {
protected:
void SetUp() override {
logical_dns_cluster_resolver_response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
InitClient();
ChannelArguments args;
args.SetPointerWithVtable(
GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR,
logical_dns_cluster_resolver_response_generator_.get(),
&grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable);
ResetStub(/*failover_timeout_ms=*/0, &args);
}
grpc_core::ServerAddressList CreateAddressListFromPortList(
const std::vector<int>& ports) {
grpc_core::ServerAddressList addresses;
for (int port : ports) {
absl::StatusOr<grpc_core::URI> lb_uri = grpc_core::URI::Parse(
absl::StrCat(ipv6_only_ ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port));
GPR_ASSERT(lb_uri.ok());
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(*lb_uri, &address));
addresses.emplace_back(address.addr, address.len, nullptr);
}
return addresses;
}
std::string CreateMetadataValueThatHashesToBackendPort(int port) {
return absl::StrCat(ipv6_only_ ? "[::1]" : "127.0.0.1", ":", port, "_0");
}
std::string CreateMetadataValueThatHashesToBackend(int index) {
return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port());
}
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
logical_dns_cluster_resolver_response_generator_;
};
// Run both with and without load reporting, just for test coverage.
INSTANTIATE_TEST_SUITE_P(
XdsTest, RingHashTest,
::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
&XdsTestType::Name);
TEST_P(RingHashTest, AggregateClusterFallBackFromRingHashAtStartup) {
ScopedExperimentalEnvVar env_var(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
CreateAndStartBackends(2);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewEdsService1Name = "new_eds_service_name_1";
const char* kNewCluster2Name = "new_cluster_2";
const char* kNewEdsService2Name = "new_eds_service_name_2";
// Populate new EDS resources.
EdsResourceArgs args1({
{"locality0", {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}},
});
EdsResourceArgs args2({
{"locality0", CreateEndpointsForBackends()},
});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args1, kNewEdsService1Name));
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args2, kNewEdsService2Name));
// Populate new CDS resources.
Cluster new_cluster1 = default_cluster_;
new_cluster1.set_name(kNewCluster1Name);
new_cluster1.mutable_eds_cluster_config()->set_service_name(
kNewEdsService1Name);
balancer_->ads_service()->SetCdsResource(new_cluster1);
Cluster new_cluster2 = default_cluster_;
new_cluster2.set_name(kNewCluster2Name);
new_cluster2.mutable_eds_cluster_config()->set_service_name(
kNewEdsService2Name);
balancer_->ads_service()->SetCdsResource(new_cluster2);
// Create Aggregate Cluster
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kNewCluster1Name);
cluster_config.add_clusters(kNewCluster2Name);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// Set up route with channel id hashing
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Verifying that we are using ring hash as only 1 endpoint is receiving all
// the traffic.
CheckRpcSendOk(100);
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
}
TEST_P(RingHashTest,
AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) {
ScopedExperimentalEnvVar env_var(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
CreateAndStartBackends(1);
const char* kEdsClusterName = "eds_cluster";
const char* kLogicalDNSClusterName = "logical_dns_cluster";
// Populate EDS resource.
EdsResourceArgs args({
{"locality0",
{MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
kDefaultLocalityWeight,
0},
{"locality1",
{MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
kDefaultLocalityWeight,
1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Populate new CDS resources.
Cluster eds_cluster = default_cluster_;
eds_cluster.set_name(kEdsClusterName);
balancer_->ads_service()->SetCdsResource(eds_cluster);
// Populate LOGICAL_DNS cluster.
auto logical_dns_cluster = default_cluster_;
logical_dns_cluster.set_name(kLogicalDNSClusterName);
logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
auto* address = logical_dns_cluster.mutable_load_assignment()
->add_endpoints()
->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
address->set_address(kServerName);
address->set_port_value(443);
balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
// Create Aggregate Cluster
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kEdsClusterName);
cluster_config.add_clusters(kLogicalDNSClusterName);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// Set up route with channel id hashing
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Set Logical DNS result
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts());
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Inject connection delay to make this act more realistically.
ConnectionDelayInjector delay_injector(
grpc_core::Duration::Milliseconds(500) * grpc_test_slowdown_factor());
// Send RPC. Need the timeout to be long enough to account for the
// subchannel connection delays.
CheckRpcSendOk(1, RpcOptions().set_timeout_ms(3500));
}
// Tests that ring hash policy that hashes using channel id ensures all RPCs
// to go 1 particular backend.
TEST_P(RingHashTest, ChannelIdHashing) {
CreateAndStartBackends(4);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
CheckRpcSendOk(100);
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
}
// Tests that ring hash policy that hashes using a header value can spread
// RPCs across all the backends.
TEST_P(RingHashTest, HeaderHashing) {
CreateAndStartBackends(4);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Note each type of RPC will contains a header value that will always be
// hashed to a specific backend as the header value matches the value used
// to create the entry in the ring.
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
std::vector<std::pair<std::string, std::string>> metadata1 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
std::vector<std::pair<std::string, std::string>> metadata2 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
std::vector<std::pair<std::string, std::string>> metadata3 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1));
const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2));
const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3));
WaitForBackend(0, WaitForBackendOptions(), rpc_options);
WaitForBackend(1, WaitForBackendOptions(), rpc_options1);
WaitForBackend(2, WaitForBackendOptions(), rpc_options2);
WaitForBackend(3, WaitForBackendOptions(), rpc_options3);
CheckRpcSendOk(100, rpc_options);
CheckRpcSendOk(100, rpc_options1);
CheckRpcSendOk(100, rpc_options2);
CheckRpcSendOk(100, rpc_options3);
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
}
}
// Tests that ring hash policy that hashes using a header value and regex
// rewrite to aggregate RPCs to 1 backend.
TEST_P(RingHashTest, HeaderHashingWithRegexRewrite) {
CreateAndStartBackends(4);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
hash_policy->mutable_header()
->mutable_regex_rewrite()
->mutable_pattern()
->set_regex("[0-9]+");
hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution(
"foo");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
std::vector<std::pair<std::string, std::string>> metadata1 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
std::vector<std::pair<std::string, std::string>> metadata2 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
std::vector<std::pair<std::string, std::string>> metadata3 = {
{"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1));
const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2));
const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3));
CheckRpcSendOk(100, rpc_options);
CheckRpcSendOk(100, rpc_options1);
CheckRpcSendOk(100, rpc_options2);
CheckRpcSendOk(100, rpc_options3);
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
}
// Tests that ring hash policy that hashes using a random value.
TEST_P(RingHashTest, NoHashPolicy) {
CreateAndStartBackends(2);
const double kDistribution50Percent = 0.5;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(0, 2, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(kNumRpcs);
const int request_count_1 = backends_[0]->backend_service()->request_count();
const int request_count_2 = backends_[1]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
}
// Test that ring hash policy evaluation will continue past the terminal
// policy if no results are produced yet.
TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) {
CreateAndStartBackends(2);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("header_not_present");
hash_policy->set_terminal(true);
auto* hash_policy2 = route->mutable_route()->add_hash_policy();
hash_policy2->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
CheckRpcSendOk(100, rpc_options);
EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100);
EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
}
// Test random hash is used when header hashing specified a header field that
// the RPC did not have.
TEST_P(RingHashTest, HashOnHeaderThatIsNotPresent) {
CreateAndStartBackends(2);
const double kDistribution50Percent = 0.5;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("header_not_present");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"unmatched_header", absl::StrFormat("%" PRIu32, rand())},
};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(0, 2, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(kNumRpcs, rpc_options);
const int request_count_1 = backends_[0]->backend_service()->request_count();
const int request_count_2 = backends_[1]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
}
// Test random hash is used when only unsupported hash policies are
// configured.
TEST_P(RingHashTest, UnsupportedHashPolicyDefaultToRandomHashing) {
CreateAndStartBackends(2);
const double kDistribution50Percent = 0.5;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
true);
auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_3->mutable_query_parameter()->set_name(
"query_parameter");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(0, 2, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(kNumRpcs);
const int request_count_1 = backends_[0]->backend_service()->request_count();
const int request_count_2 = backends_[1]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
}
// Tests that ring hash policy that hashes using a random value can spread
// RPCs across all the backends according to locality weight.
TEST_P(RingHashTest, RandomHashingDistributionAccordingToEndpointWeight) {
CreateAndStartBackends(2);
const size_t kWeight1 = 1;
const size_t kWeight2 = 2;
const size_t kWeightTotal = kWeight1 + kWeight2;
const double kWeight33Percent = static_cast<double>(kWeight1) / kWeightTotal;
const double kWeight66Percent = static_cast<double>(kWeight2) / kWeightTotal;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({{"locality0",
{CreateEndpoint(0, HealthStatus::UNKNOWN, 1),
CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(0, 2, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(kNumRpcs);
const int weight_33_request_count =
backends_[0]->backend_service()->request_count();
const int weight_66_request_count =
backends_[1]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(weight_33_request_count) / kNumRpcs,
::testing::DoubleNear(kWeight33Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(weight_66_request_count) / kNumRpcs,
::testing::DoubleNear(kWeight66Percent, kErrorTolerance));
}
// Tests that ring hash policy that hashes using a random value can spread
// RPCs across all the backends according to locality weight.
TEST_P(RingHashTest,
RandomHashingDistributionAccordingToLocalityAndEndpointWeight) {
CreateAndStartBackends(2);
const size_t kWeight1 = 1 * 1;
const size_t kWeight2 = 2 * 2;
const size_t kWeightTotal = kWeight1 + kWeight2;
const double kWeight20Percent = static_cast<double>(kWeight1) / kWeightTotal;
const double kWeight80Percent = static_cast<double>(kWeight2) / kWeightTotal;
const double kErrorTolerance = 0.05;
const uint32_t kRpcTimeoutMs = 10000;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance);
auto cluster = default_cluster_;
// Increasing min ring size for random distribution.
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
100000);
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args(
{{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1},
{"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// TODO(donnadionne): remove extended timeout after ring creation
// optimization.
WaitForAllBackends(0, 2, WaitForBackendOptions(),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
CheckRpcSendOk(kNumRpcs);
const int weight_20_request_count =
backends_[0]->backend_service()->request_count();
const int weight_80_request_count =
backends_[1]->backend_service()->request_count();
EXPECT_THAT(static_cast<double>(weight_20_request_count) / kNumRpcs,
::testing::DoubleNear(kWeight20Percent, kErrorTolerance));
EXPECT_THAT(static_cast<double>(weight_80_request_count) / kNumRpcs,
::testing::DoubleNear(kWeight80Percent, kErrorTolerance));
}
// Tests that ring hash policy that hashes using a fixed string ensures all
// RPCs to go 1 particular backend; and that subsequent hashing policies are
// ignored due to the setting of terminal.
TEST_P(RingHashTest, FixedHashingTerminalPolicy) {
CreateAndStartBackends(2);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("fixed_string");
hash_policy->set_terminal(true);
auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy();
hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"fixed_string", "fixed_value"},
{"random_string", absl::StrFormat("%" PRIu32, rand())},
};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
CheckRpcSendOk(100, rpc_options);
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
}
// Test that the channel will go from idle to ready via connecting;
// (tho it is not possible to catch the connecting state before moving to
// ready)
TEST_P(RingHashTest, IdleToReady) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
CheckRpcSendOk();
EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
}
// Test that the channel will transition to READY once it starts
// connecting even if there are no RPCs being sent to the picker.
TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
// Create EDS resource.
CreateAndStartBackends(1);
auto non_existant_endpoint = MakeNonExistantEndpoint();
EdsResourceArgs args(
{{"locality0", {non_existant_endpoint, CreateEndpoint(0)}}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Change CDS resource to use RING_HASH.
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
// Add hash policy to RDS resource.
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// A connection injector that cancels the RPC after seeing the
// connection attempt for the non-existant endpoint.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
explicit ConnectionInjector(int port) : port_(port) {}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
{
grpc_core::MutexLock lock(&mu_);
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): seen_port_=%d, port=%d",
seen_port_, port);
if (!seen_port_ && port == port_) {
gpr_log(GPR_INFO, "*** SEEN P0 CONNECTION ATTEMPT");
seen_port_ = true;
cond_.Signal();
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
void WaitForP0ConnectionAttempt() {
grpc_core::MutexLock lock(&mu_);
while (!seen_port_) {
cond_.Wait(&mu_);
}
}
private:
const int port_;
grpc_core::Mutex mu_;
grpc_core::CondVar cond_;
bool seen_port_ ABSL_GUARDED_BY(mu_) = false;
};
ConnectionInjector connection_injector(non_existant_endpoint.port);
// A long-running RPC, just used to send the RPC in another thread.
LongRunningRpc rpc;
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash",
CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}};
rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
std::move(metadata)));
// Wait for the RPC to trigger the P0 connection attempt, then cancel it.
connection_injector.WaitForP0ConnectionAttempt();
rpc.CancelRpc();
// Wait for channel to become connected without any pending RPC.
EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5)));
// RPC should have been cancelled.
EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
// Make sure the backend did not get any requests.
EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
}
// Test that when the first pick is down leading to a transient failure, we
// will move on to the next ring hash entry.
TEST_P(RingHashTest, TransientFailureCheckNextOne) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
std::vector<EdsResourceArgs::Endpoint> endpoints;
const int unused_port = grpc_pick_unused_port_or_die();
endpoints.emplace_back(unused_port);
endpoints.emplace_back(backends_[0]->port());
EdsResourceArgs args({{"locality0", std::move(endpoints)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash",
CreateMetadataValueThatHashesToBackendPort(unused_port)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
WaitForBackend(0, WaitForBackendOptions(), rpc_options);
CheckRpcSendOk(100, rpc_options);
}
// Test that when a backend goes down, we will move on to the next subchannel
// (with a lower priority). When the backend comes back up, traffic will move
// back.
TEST_P(RingHashTest, SwitchToLowerPrioirtyAndThenBack) {
CreateAndStartBackends(2);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
0},
{"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
WaitForBackend(0, WaitForBackendOptions(), rpc_options);
ShutdownBackend(0);
WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true),
rpc_options);
StartBackend(0);
WaitForBackend(0, WaitForBackendOptions(), rpc_options);
CheckRpcSendOk(100, rpc_options);
EXPECT_EQ(100, backends_[0]->backend_service()->request_count());
EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
}
// Test that when all backends are down, we will keep reattempting.
TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) {
CreateAndStartBackends(1);
const uint32_t kConnectionTimeoutMilliseconds = 5000;
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args(
{{"locality0", {MakeNonExistantEndpoint(), CreateEndpoint(0)}}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
ShutdownBackend(0);
CheckRpcSendFailure(CheckRpcSendFailureOptions().set_rpc_options(
RpcOptions().set_metadata(std::move(metadata))));
StartBackend(0);
// Ensure we are actively connecting without any traffic.
EXPECT_TRUE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
}
// Test that when all backends are down and then up, we may pick a TF backend
// and we will then jump to ready backend.
TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
CreateAndStartBackends(2);
const uint32_t kConnectionTimeoutMilliseconds = 5000;
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Make sure we include some unused ports to fill the ring.
EdsResourceArgs args({
{"locality0",
{CreateEndpoint(0), CreateEndpoint(1), MakeNonExistantEndpoint(),
MakeNonExistantEndpoint()}},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
ShutdownBackend(0);
ShutdownBackend(1);
CheckRpcSendFailure(
CheckRpcSendFailureOptions().set_rpc_options(rpc_options));
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
// Bring up 0, should be picked as the RPC is hashed to it.
StartBackend(0);
EXPECT_TRUE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
WaitForBackend(0, WaitForBackendOptions(), rpc_options);
// Bring down 0 and bring up 1.
// Note the RPC contains a header value that will always be hashed to
// backend 0. So by purposely bring down backend 0 and bring up another
// backend, this will ensure Picker's first choice of backend 0 will fail
// and it will
// 1. reattempt backend 0 and
// 2. go through the remaining subchannels to find one in READY.
// Since the the entries in the ring is pretty distributed and we have
// unused ports to fill the ring, it is almost guaranteed that the Picker
// will go through some non-READY entries and skip them as per design.
ShutdownBackend(0);
CheckRpcSendFailure(
CheckRpcSendFailureOptions().set_rpc_options(rpc_options));
StartBackend(1);
EXPECT_TRUE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
WaitForBackend(1, WaitForBackendOptions(), rpc_options);
}
// Test unspported hash policy types are all ignored before a supported
// policy.
TEST_P(RingHashTest, UnsupportedHashPolicyUntilChannelIdHashing) {
CreateAndStartBackends(2);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
true);
auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
hash_policy_unsupported_3->mutable_query_parameter()->set_name(
"query_parameter");
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
CheckRpcSendOk(100);
bool found = false;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() > 0) {
EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
<< "backend " << i;
EXPECT_FALSE(found) << "backend " << i;
found = true;
}
}
EXPECT_TRUE(found);
}
// Test we nack when ring hash policy has invalid hash function (something
// other than XX_HASH.
TEST_P(RingHashTest, InvalidHashFunction) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
cluster.mutable_ring_hash_lb_config()->set_hash_function(
Cluster::RingHashLbConfig::MURMUR_HASH_2);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForCdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(
response_state->error_message,
::testing::HasSubstr("ring hash lb config has invalid hash function."));
}
// Test we nack when ring hash policy has invalid ring size.
TEST_P(RingHashTest, InvalidMinimumRingSize) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
0);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForCdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr(
"min_ring_size is not in the range of 1 to 8388608."));
}
// Test we nack when ring hash policy has invalid ring size.
TEST_P(RingHashTest, InvalidMaxmumRingSize) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
8388609);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForCdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr(
"max_ring_size is not in the range of 1 to 8388608."));
}
// Test we nack when ring hash policy has invalid ring size.
TEST_P(RingHashTest, InvalidRingSizeMinGreaterThanMax) {
CreateAndStartBackends(1);
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
5000);
cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
5001);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForCdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr(
"min_ring_size cannot be greater than max_ring_size."));
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
// Make the backup poller poll very frequently in order to pick up
// updates from all the subchannels's FDs.
GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
#if TARGET_OS_IPHONE
// Workaround Apple CFStream bug
gpr_setenv("grpc_cfstream", "0");
#endif
grpc_init();
grpc::testing::ConnectionAttemptInjector::Init();
const auto result = RUN_ALL_TESTS();
grpc_shutdown();
return result;
}
Loading…
Cancel
Save