From bea59115695b5c714b45e4cb92c56550988973da Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Sat, 16 Apr 2022 14:42:43 -0700 Subject: [PATCH] xds_end2end_test: Move ring_hash tests to their own file (#29287) * move some code around * remove num_backends parameter from XdsEnd2endTest * remove use_xds_enabled_server param from XdsEnd2endTest * remove xds_resource_does_not_exist_timeout_ms param from XdsEnd2endTest * remove client_load_reporting_interval_seconds param from XdsEnd2endTest * start moving CreateAndStartBackends() into individual tests * finish moving CreateAndStartBackends() into individual tests * remove unused variable * remove SetEdsResourceWithDelay * fix test flake * clang-tidy * clang-format * move test framework to its own library * fix build * clang-format * fix windows build * rename TestType to XdsTestType * move BackendServiceImpl inside of BackendServerThread * clang-format * move AdminServerThread to CSDS test suite * move ring_hash tests to their own file * generate_projects * remove unnecessary deps * re-add flaky tag * clang-format --- CMakeLists.txt | 173 +++ build_autogenerated.yaml | 57 + test/cpp/end2end/xds/BUILD | 29 + test/cpp/end2end/xds/xds_end2end_test.cc | 889 +--------------- .../end2end/xds/xds_ring_hash_end2end_test.cc | 985 ++++++++++++++++++ 5 files changed, 1245 insertions(+), 888 deletions(-) create mode 100644 test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 826bc66ae64..224f1f1f4d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1194,6 +1194,9 @@ if(gRPC_BUILD_TESTS) endif() add_dependencies(buildtests_cxx xds_interop_client) add_dependencies(buildtests_cxx xds_interop_server) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx xds_ring_hash_end2end_test) + endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx xds_rls_end2end_test) endif() @@ -18044,6 +18047,176 @@ target_link_libraries(xds_interop_server ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + + add_executable(xds_ring_hash_end2end_test + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h + test/cpp/end2end/connection_delay_injector.cc + test/cpp/end2end/test_service_impl.cc + test/cpp/end2end/xds/xds_end2end_test_lib.cc + test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc + test/cpp/end2end/xds/xds_server.cc + test/cpp/util/tls_test_utils.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + + target_include_directories(xds_ring_hash_end2end_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(xds_ring_hash_end2end_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc++_test_util + ) + + +endif() endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index c8b02e91491..d1cf320abbe 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -8679,6 +8679,63 @@ targets: - grpcpp_channelz - grpc_test_util - grpc++_test_config +- name: xds_ring_hash_end2end_test + gtest: true + build: test + run: false + language: c++ + headers: + - test/cpp/end2end/connection_delay_injector.h + - test/cpp/end2end/counted_service.h + - test/cpp/end2end/test_service_impl.h + - test/cpp/end2end/xds/xds_end2end_test_lib.h + - test/cpp/end2end/xds/xds_server.h + - test/cpp/util/tls_test_utils.h + src: + - src/proto/grpc/testing/duplicate/echo_duplicate.proto + - src/proto/grpc/testing/echo.proto + - src/proto/grpc/testing/echo_messages.proto + - src/proto/grpc/testing/simple_messages.proto + - src/proto/grpc/testing/xds/ads_for_test.proto + - src/proto/grpc/testing/xds/eds_for_test.proto + - src/proto/grpc/testing/xds/lrs_for_test.proto + - src/proto/grpc/testing/xds/v3/address.proto + - src/proto/grpc/testing/xds/v3/ads.proto + - src/proto/grpc/testing/xds/v3/aggregate_cluster.proto + - src/proto/grpc/testing/xds/v3/base.proto + - src/proto/grpc/testing/xds/v3/cluster.proto + - src/proto/grpc/testing/xds/v3/config_source.proto + - src/proto/grpc/testing/xds/v3/discovery.proto + - src/proto/grpc/testing/xds/v3/endpoint.proto + - src/proto/grpc/testing/xds/v3/expr.proto + - src/proto/grpc/testing/xds/v3/extension.proto + - src/proto/grpc/testing/xds/v3/http_connection_manager.proto + - src/proto/grpc/testing/xds/v3/http_filter_rbac.proto + - src/proto/grpc/testing/xds/v3/listener.proto + - src/proto/grpc/testing/xds/v3/load_report.proto + - src/proto/grpc/testing/xds/v3/lrs.proto + - src/proto/grpc/testing/xds/v3/metadata.proto + - src/proto/grpc/testing/xds/v3/path.proto + - src/proto/grpc/testing/xds/v3/percent.proto + - src/proto/grpc/testing/xds/v3/protocol.proto + - src/proto/grpc/testing/xds/v3/range.proto + - src/proto/grpc/testing/xds/v3/rbac.proto + - src/proto/grpc/testing/xds/v3/regex.proto + - src/proto/grpc/testing/xds/v3/route.proto + - src/proto/grpc/testing/xds/v3/router.proto + - src/proto/grpc/testing/xds/v3/string.proto + - test/cpp/end2end/connection_delay_injector.cc + - test/cpp/end2end/test_service_impl.cc + - test/cpp/end2end/xds/xds_end2end_test_lib.cc + - test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc + - test/cpp/end2end/xds/xds_server.cc + - test/cpp/util/tls_test_utils.cc + deps: + - grpc++_test_util + platforms: + - linux + - posix + - mac - name: xds_rls_end2end_test gtest: true build: test diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD index 8067eb96ef2..acfa723dd27 100644 --- a/test/cpp/end2end/xds/BUILD +++ b/test/cpp/end2end/xds/BUILD @@ -219,6 +219,35 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "xds_ring_hash_end2end_test", + size = "large", + srcs = ["xds_ring_hash_end2end_test.cc"], + external_deps = [ + "gtest", + ], + flaky = True, # TODO(b/144705388) + linkstatic = True, # Fixes dyld error on MacOS + tags = [ + "no_test_ios", + "no_windows", + ], # TODO(jtattermusch): fix test on windows + deps = [ + ":xds_end2end_test_lib", + "//:gpr", + "//:grpc", + "//:grpc++", + "//:grpc_resolver_fake", + "//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto", + "//src/proto/grpc/testing/xds/v3:cluster_proto", + "//src/proto/grpc/testing/xds/v3:endpoint_proto", + "//src/proto/grpc/testing/xds/v3:listener_proto", + "//src/proto/grpc/testing/xds/v3:route_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/end2end:connection_delay_injector", + ], +) + grpc_cc_test( name = "xds_rls_end2end_test", size = "large", diff --git a/test/cpp/end2end/xds/xds_end2end_test.cc b/test/cpp/end2end/xds/xds_end2end_test.cc index 611c3bf68e7..63b9ef1b152 100644 --- a/test/cpp/end2end/xds/xds_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_end2end_test.cc @@ -652,14 +652,6 @@ class CdsTest : public XdsEnd2endTest { return addresses; } - std::string CreateMetadataValueThatHashesToBackendPort(int port) { - return absl::StrCat(ipv6_only_ ? "[::1]" : "127.0.0.1", ":", port, "_0"); - } - - std::string CreateMetadataValueThatHashesToBackend(int index) { - return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port()); - } - grpc_core::RefCountedPtr logical_dns_cluster_resolver_response_generator_; }; @@ -1004,136 +996,6 @@ TEST_P(CdsTest, AggregateClusterDiamondDependency) { WaitForBackend(0); } -TEST_P(CdsTest, AggregateClusterFallBackFromRingHashAtStartup) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - CreateAndStartBackends(2); - const char* kNewCluster1Name = "new_cluster_1"; - const char* kNewEdsService1Name = "new_eds_service_name_1"; - const char* kNewCluster2Name = "new_cluster_2"; - const char* kNewEdsService2Name = "new_eds_service_name_2"; - // Populate new EDS resources. - EdsResourceArgs args1({ - {"locality0", {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}}, - }); - EdsResourceArgs args2({ - {"locality0", CreateEndpointsForBackends()}, - }); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args1, kNewEdsService1Name)); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args2, kNewEdsService2Name)); - // Populate new CDS resources. - Cluster new_cluster1 = default_cluster_; - new_cluster1.set_name(kNewCluster1Name); - new_cluster1.mutable_eds_cluster_config()->set_service_name( - kNewEdsService1Name); - balancer_->ads_service()->SetCdsResource(new_cluster1); - Cluster new_cluster2 = default_cluster_; - new_cluster2.set_name(kNewCluster2Name); - new_cluster2.mutable_eds_cluster_config()->set_service_name( - kNewEdsService2Name); - balancer_->ads_service()->SetCdsResource(new_cluster2); - // Create Aggregate Cluster - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(kNewCluster1Name); - cluster_config.add_clusters(kNewCluster2Name); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - // Set up route with channel id hashing - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - // Verifying that we are using ring hash as only 1 endpoint is receiving all - // the traffic. - CheckRpcSendOk(100); - bool found = false; - for (size_t i = 0; i < backends_.size(); ++i) { - if (backends_[i]->backend_service()->request_count() > 0) { - EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) - << "backend " << i; - EXPECT_FALSE(found) << "backend " << i; - found = true; - } - } - EXPECT_TRUE(found); -} - -TEST_P(CdsTest, AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - CreateAndStartBackends(1); - const char* kEdsClusterName = "eds_cluster"; - const char* kLogicalDNSClusterName = "logical_dns_cluster"; - // Populate EDS resource. - EdsResourceArgs args({ - {"locality0", - {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}, - kDefaultLocalityWeight, - 0}, - {"locality1", - {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}, - kDefaultLocalityWeight, - 1}, - }); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // Populate new CDS resources. - Cluster eds_cluster = default_cluster_; - eds_cluster.set_name(kEdsClusterName); - balancer_->ads_service()->SetCdsResource(eds_cluster); - // Populate LOGICAL_DNS cluster. - auto logical_dns_cluster = default_cluster_; - logical_dns_cluster.set_name(kLogicalDNSClusterName); - logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); - auto* address = logical_dns_cluster.mutable_load_assignment() - ->add_endpoints() - ->add_lb_endpoints() - ->mutable_endpoint() - ->mutable_address() - ->mutable_socket_address(); - address->set_address(kServerName); - address->set_port_value(443); - balancer_->ads_service()->SetCdsResource(logical_dns_cluster); - // Create Aggregate Cluster - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(kEdsClusterName); - cluster_config.add_clusters(kLogicalDNSClusterName); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - // Set up route with channel id hashing - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - // Set Logical DNS result - { - grpc_core::ExecCtx exec_ctx; - grpc_core::Resolver::Result result; - result.addresses = CreateAddressListFromPortList(GetBackendPorts()); - logical_dns_cluster_resolver_response_generator_->SetResponse( - std::move(result)); - } - // Inject connection delay to make this act more realistically. - ConnectionDelayInjector delay_injector( - grpc_core::Duration::Milliseconds(500) * grpc_test_slowdown_factor()); - // Send RPC. Need the timeout to be long enough to account for the - // subchannel connection delays. - CheckRpcSendOk(1, RpcOptions().set_timeout_ms(3500)); -} - // This test covers a bug found in the following scenario: // 1. P0 reports TRANSIENT_FAILURE, so we start connecting to P1. // 2. While P1 is still in CONNECTING, P0 goes back to READY, so we @@ -1681,343 +1543,9 @@ TEST_P(CdsTest, WrongLrsServer) { ::testing::HasSubstr("LRS ConfigSource is not self.")); } -// Tests that ring hash policy that hashes using channel id ensures all RPCs -// to go 1 particular backend. -TEST_P(CdsTest, RingHashChannelIdHashing) { - CreateAndStartBackends(4); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - CheckRpcSendOk(100); - bool found = false; - for (size_t i = 0; i < backends_.size(); ++i) { - if (backends_[i]->backend_service()->request_count() > 0) { - EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) - << "backend " << i; - EXPECT_FALSE(found) << "backend " << i; - found = true; - } - } - EXPECT_TRUE(found); -} - -// Tests that ring hash policy that hashes using a header value can spread -// RPCs across all the backends. -TEST_P(CdsTest, RingHashHeaderHashing) { - CreateAndStartBackends(4); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_header()->set_header_name("address_hash"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // Note each type of RPC will contains a header value that will always be - // hashed to a specific backend as the header value matches the value used - // to create the entry in the ring. - std::vector> metadata = { - {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; - std::vector> metadata1 = { - {"address_hash", CreateMetadataValueThatHashesToBackend(1)}}; - std::vector> metadata2 = { - {"address_hash", CreateMetadataValueThatHashesToBackend(2)}}; - std::vector> metadata3 = { - {"address_hash", CreateMetadataValueThatHashesToBackend(3)}}; - const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); - const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1)); - const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2)); - const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3)); - WaitForBackend(0, WaitForBackendOptions(), rpc_options); - WaitForBackend(1, WaitForBackendOptions(), rpc_options1); - WaitForBackend(2, WaitForBackendOptions(), rpc_options2); - WaitForBackend(3, WaitForBackendOptions(), rpc_options3); - CheckRpcSendOk(100, rpc_options); - CheckRpcSendOk(100, rpc_options1); - CheckRpcSendOk(100, rpc_options2); - CheckRpcSendOk(100, rpc_options3); - for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(100, backends_[i]->backend_service()->request_count()); - } -} - -// Tests that ring hash policy that hashes using a header value and regex -// rewrite to aggregate RPCs to 1 backend. -TEST_P(CdsTest, RingHashHeaderHashingWithRegexRewrite) { - CreateAndStartBackends(4); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_header()->set_header_name("address_hash"); - hash_policy->mutable_header() - ->mutable_regex_rewrite() - ->mutable_pattern() - ->set_regex("[0-9]+"); - hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution( - "foo"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - std::vector> metadata = { - {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; - std::vector> metadata1 = { - {"address_hash", CreateMetadataValueThatHashesToBackend(1)}}; - std::vector> metadata2 = { - {"address_hash", CreateMetadataValueThatHashesToBackend(2)}}; - std::vector> metadata3 = { - {"address_hash", CreateMetadataValueThatHashesToBackend(3)}}; - const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); - const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1)); - const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2)); - const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3)); - CheckRpcSendOk(100, rpc_options); - CheckRpcSendOk(100, rpc_options1); - CheckRpcSendOk(100, rpc_options2); - CheckRpcSendOk(100, rpc_options3); - bool found = false; - for (size_t i = 0; i < backends_.size(); ++i) { - if (backends_[i]->backend_service()->request_count() > 0) { - EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400) - << "backend " << i; - EXPECT_FALSE(found) << "backend " << i; - found = true; - } - } - EXPECT_TRUE(found); -} - -// Tests that ring hash policy that hashes using a random value. -TEST_P(CdsTest, RingHashNoHashPolicy) { - CreateAndStartBackends(2); - const double kDistribution50Percent = 0.5; - const double kErrorTolerance = 0.05; - const uint32_t kRpcTimeoutMs = 10000; - const size_t kNumRpcs = - ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance); - auto cluster = default_cluster_; - // Increasing min ring size for random distribution. - cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( - 100000); - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // TODO(donnadionne): remove extended timeout after ring creation - // optimization. - WaitForAllBackends(0, 2, WaitForBackendOptions(), - RpcOptions().set_timeout_ms(kRpcTimeoutMs)); - CheckRpcSendOk(kNumRpcs); - const int request_count_1 = backends_[0]->backend_service()->request_count(); - const int request_count_2 = backends_[1]->backend_service()->request_count(); - EXPECT_THAT(static_cast(request_count_1) / kNumRpcs, - ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); - EXPECT_THAT(static_cast(request_count_2) / kNumRpcs, - ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); -} - -// Test that ring hash policy evaluation will continue past the terminal -// policy if no results are produced yet. -TEST_P(CdsTest, RingHashContinuesPastTerminalPolicyThatDoesNotProduceResult) { - CreateAndStartBackends(2); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_header()->set_header_name("header_not_present"); - hash_policy->set_terminal(true); - auto* hash_policy2 = route->mutable_route()->add_hash_policy(); - hash_policy2->mutable_header()->set_header_name("address_hash"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - std::vector> metadata = { - {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; - const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); - CheckRpcSendOk(100, rpc_options); - EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100); - EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0); -} - -// Test random hash is used when header hashing specified a header field that -// the RPC did not have. -TEST_P(CdsTest, RingHashOnHeaderThatIsNotPresent) { - CreateAndStartBackends(2); - const double kDistribution50Percent = 0.5; - const double kErrorTolerance = 0.05; - const uint32_t kRpcTimeoutMs = 10000; - const size_t kNumRpcs = - ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance); - auto cluster = default_cluster_; - // Increasing min ring size for random distribution. - cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( - 100000); - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_header()->set_header_name("header_not_present"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - std::vector> metadata = { - {"unmatched_header", absl::StrFormat("%" PRIu32, rand())}, - }; - const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); - // TODO(donnadionne): remove extended timeout after ring creation - // optimization. - WaitForAllBackends(0, 2, WaitForBackendOptions(), - RpcOptions().set_timeout_ms(kRpcTimeoutMs)); - CheckRpcSendOk(kNumRpcs, rpc_options); - const int request_count_1 = backends_[0]->backend_service()->request_count(); - const int request_count_2 = backends_[1]->backend_service()->request_count(); - EXPECT_THAT(static_cast(request_count_1) / kNumRpcs, - ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); - EXPECT_THAT(static_cast(request_count_2) / kNumRpcs, - ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); -} - -// Test random hash is used when only unsupported hash policies are -// configured. -TEST_P(CdsTest, RingHashUnsupportedHashPolicyDefaultToRandomHashing) { - CreateAndStartBackends(2); - const double kDistribution50Percent = 0.5; - const double kErrorTolerance = 0.05; - const uint32_t kRpcTimeoutMs = 10000; - const size_t kNumRpcs = - ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance); - auto cluster = default_cluster_; - // Increasing min ring size for random distribution. - cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( - 100000); - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy(); - hash_policy_unsupported_1->mutable_cookie()->set_name("cookie"); - auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy(); - hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip( - true); - auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy(); - hash_policy_unsupported_3->mutable_query_parameter()->set_name( - "query_parameter"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // TODO(donnadionne): remove extended timeout after ring creation - // optimization. - WaitForAllBackends(0, 2, WaitForBackendOptions(), - RpcOptions().set_timeout_ms(kRpcTimeoutMs)); - CheckRpcSendOk(kNumRpcs); - const int request_count_1 = backends_[0]->backend_service()->request_count(); - const int request_count_2 = backends_[1]->backend_service()->request_count(); - EXPECT_THAT(static_cast(request_count_1) / kNumRpcs, - ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); - EXPECT_THAT(static_cast(request_count_2) / kNumRpcs, - ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); -} - -// Tests that ring hash policy that hashes using a random value can spread -// RPCs across all the backends according to locality weight. -TEST_P(CdsTest, RingHashRandomHashingDistributionAccordingToEndpointWeight) { - CreateAndStartBackends(2); - const size_t kWeight1 = 1; - const size_t kWeight2 = 2; - const size_t kWeightTotal = kWeight1 + kWeight2; - const double kWeight33Percent = static_cast(kWeight1) / kWeightTotal; - const double kWeight66Percent = static_cast(kWeight2) / kWeightTotal; - const double kErrorTolerance = 0.05; - const uint32_t kRpcTimeoutMs = 10000; - const size_t kNumRpcs = - ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance); - auto cluster = default_cluster_; - // Increasing min ring size for random distribution. - cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( - 100000); - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - EdsResourceArgs args({{"locality0", - {CreateEndpoint(0, HealthStatus::UNKNOWN, 1), - CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // TODO(donnadionne): remove extended timeout after ring creation - // optimization. - WaitForAllBackends(0, 2, WaitForBackendOptions(), - RpcOptions().set_timeout_ms(kRpcTimeoutMs)); - CheckRpcSendOk(kNumRpcs); - const int weight_33_request_count = - backends_[0]->backend_service()->request_count(); - const int weight_66_request_count = - backends_[1]->backend_service()->request_count(); - EXPECT_THAT(static_cast(weight_33_request_count) / kNumRpcs, - ::testing::DoubleNear(kWeight33Percent, kErrorTolerance)); - EXPECT_THAT(static_cast(weight_66_request_count) / kNumRpcs, - ::testing::DoubleNear(kWeight66Percent, kErrorTolerance)); -} - -// Tests that ring hash policy that hashes using a random value can spread -// RPCs across all the backends according to locality weight. -TEST_P(CdsTest, - RingHashRandomHashingDistributionAccordingToLocalityAndEndpointWeight) { - CreateAndStartBackends(2); - const size_t kWeight1 = 1 * 1; - const size_t kWeight2 = 2 * 2; - const size_t kWeightTotal = kWeight1 + kWeight2; - const double kWeight20Percent = static_cast(kWeight1) / kWeightTotal; - const double kWeight80Percent = static_cast(kWeight2) / kWeightTotal; - const double kErrorTolerance = 0.05; - const uint32_t kRpcTimeoutMs = 10000; - const size_t kNumRpcs = - ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance); - auto cluster = default_cluster_; - // Increasing min ring size for random distribution. - cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( - 100000); - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - EdsResourceArgs args( - {{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1}, - {"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // TODO(donnadionne): remove extended timeout after ring creation - // optimization. - WaitForAllBackends(0, 2, WaitForBackendOptions(), - RpcOptions().set_timeout_ms(kRpcTimeoutMs)); - CheckRpcSendOk(kNumRpcs); - const int weight_20_request_count = - backends_[0]->backend_service()->request_count(); - const int weight_80_request_count = - backends_[1]->backend_service()->request_count(); - EXPECT_THAT(static_cast(weight_20_request_count) / kNumRpcs, - ::testing::DoubleNear(kWeight20Percent, kErrorTolerance)); - EXPECT_THAT(static_cast(weight_80_request_count) / kNumRpcs, - ::testing::DoubleNear(kWeight80Percent, kErrorTolerance)); -} - // Tests round robin is not implacted by the endpoint weight, and that the // localities in a locality map are picked according to their weights. -TEST_P(CdsTest, RingHashEndpointWeightDoesNotImpactWeightedRoundRobin) { +TEST_P(CdsTest, EndpointWeightDoesNotImpactWeightedRoundRobin) { CreateAndStartBackends(2); const int kLocalityWeight0 = 2; const int kLocalityWeight1 = 8; @@ -2056,421 +1584,6 @@ TEST_P(CdsTest, RingHashEndpointWeightDoesNotImpactWeightedRoundRobin) { ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance)); } -// Tests that ring hash policy that hashes using a fixed string ensures all -// RPCs to go 1 particular backend; and that subsequent hashing policies are -// ignored due to the setting of terminal. -TEST_P(CdsTest, RingHashFixedHashingTerminalPolicy) { - CreateAndStartBackends(2); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_header()->set_header_name("fixed_string"); - hash_policy->set_terminal(true); - auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy(); - hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - std::vector> metadata = { - {"fixed_string", "fixed_value"}, - {"random_string", absl::StrFormat("%" PRIu32, rand())}, - }; - const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); - CheckRpcSendOk(100, rpc_options); - bool found = false; - for (size_t i = 0; i < backends_.size(); ++i) { - if (backends_[i]->backend_service()->request_count() > 0) { - EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) - << "backend " << i; - EXPECT_FALSE(found) << "backend " << i; - found = true; - } - } - EXPECT_TRUE(found); -} - -// Test that the channel will go from idle to ready via connecting; -// (tho it is not possible to catch the connecting state before moving to -// ready) -TEST_P(CdsTest, RingHashIdleToReady) { - CreateAndStartBackends(1); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); - CheckRpcSendOk(); - EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false)); -} - -// Test that the channel will transition to READY once it starts -// connecting even if there are no RPCs being sent to the picker. -TEST_P(CdsTest, RingHashContinuesConnectingWithoutPicks) { - // Create EDS resource. - CreateAndStartBackends(1); - auto non_existant_endpoint = MakeNonExistantEndpoint(); - EdsResourceArgs args( - {{"locality0", {non_existant_endpoint, CreateEndpoint(0)}}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // Change CDS resource to use RING_HASH. - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - // Add hash policy to RDS resource. - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_header()->set_header_name("address_hash"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - // A connection injector that cancels the RPC after seeing the - // connection attempt for the non-existant endpoint. - class ConnectionInjector : public ConnectionAttemptInjector { - public: - explicit ConnectionInjector(int port) : port_(port) {} - - void HandleConnection(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) override { - { - grpc_core::MutexLock lock(&mu_); - const int port = grpc_sockaddr_get_port(addr); - gpr_log(GPR_INFO, "==> HandleConnection(): seen_port_=%d, port=%d", - seen_port_, port); - // Initial attempt should be for port0_, which should fail. - // Cancel the RPC at this point, so that it's no longer - // queued when the LB policy updates the picker. - if (!seen_port_ && port == port_) { - gpr_log(GPR_INFO, "*** SEEN P0 CONNECTION ATTEMPT"); - seen_port_ = true; - cond_.Signal(); - } - } - AttemptConnection(closure, ep, interested_parties, channel_args, addr, - deadline); - } - - void WaitForP0ConnectionAttempt() { - grpc_core::MutexLock lock(&mu_); - while (!seen_port_) { - cond_.Wait(&mu_); - } - } - - private: - const int port_; - - grpc_core::Mutex mu_; - grpc_core::CondVar cond_; - bool seen_port_ ABSL_GUARDED_BY(mu_) = false; - }; - ConnectionInjector connection_injector(non_existant_endpoint.port); - // A long-running RPC, just used to send the RPC in another thread. - LongRunningRpc rpc; - std::vector> metadata = { - {"address_hash", - CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}}; - rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata( - std::move(metadata))); - // Wait for the RPC to trigger the P0 connection attempt, then cancel it. - connection_injector.WaitForP0ConnectionAttempt(); - rpc.CancelRpc(); - // Wait for channel to become connected without any pending RPC. - EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5))); - // RPC should have been cancelled. - EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code()); - // Make sure the backend did not get any requests. - EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count()); -} - -// Test that when the first pick is down leading to a transient failure, we -// will move on to the next ring hash entry. -TEST_P(CdsTest, RingHashTransientFailureCheckNextOne) { - CreateAndStartBackends(1); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_header()->set_header_name("address_hash"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - std::vector endpoints; - const int unused_port = grpc_pick_unused_port_or_die(); - endpoints.emplace_back(unused_port); - endpoints.emplace_back(backends_[0]->port()); - EdsResourceArgs args({{"locality0", std::move(endpoints)}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - std::vector> metadata = { - {"address_hash", - CreateMetadataValueThatHashesToBackendPort(unused_port)}}; - const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); - WaitForBackend(0, WaitForBackendOptions(), rpc_options); - CheckRpcSendOk(100, rpc_options); -} - -// Test that when a backend goes down, we will move on to the next subchannel -// (with a lower priority). When the backend comes back up, traffic will move -// back. -TEST_P(CdsTest, RingHashSwitchToLowerPrioirtyAndThenBack) { - CreateAndStartBackends(2); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_header()->set_header_name("address_hash"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({ - {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight, - 0}, - {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight, - 1}, - }); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - std::vector> metadata = { - {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; - const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); - WaitForBackend(0, WaitForBackendOptions(), rpc_options); - ShutdownBackend(0); - WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true), - rpc_options); - StartBackend(0); - WaitForBackend(0, WaitForBackendOptions(), rpc_options); - CheckRpcSendOk(100, rpc_options); - EXPECT_EQ(100, backends_[0]->backend_service()->request_count()); - EXPECT_EQ(0, backends_[1]->backend_service()->request_count()); -} - -// Test that when all backends are down, we will keep reattempting. -TEST_P(CdsTest, RingHashAllFailReattempt) { - CreateAndStartBackends(1); - const uint32_t kConnectionTimeoutMilliseconds = 5000; - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_header()->set_header_name("address_hash"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args( - {{"locality0", {MakeNonExistantEndpoint(), CreateEndpoint(0)}}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - std::vector> metadata = { - {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; - EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); - ShutdownBackend(0); - CheckRpcSendFailure(CheckRpcSendFailureOptions().set_rpc_options( - RpcOptions().set_metadata(std::move(metadata)))); - StartBackend(0); - // Ensure we are actively connecting without any traffic. - EXPECT_TRUE(channel_->WaitForConnected( - grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds))); -} - -// Test that when all backends are down and then up, we may pick a TF backend -// and we will then jump to ready backend. -TEST_P(CdsTest, RingHashTransientFailureSkipToAvailableReady) { - CreateAndStartBackends(2); - const uint32_t kConnectionTimeoutMilliseconds = 5000; - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_header()->set_header_name("address_hash"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - // Make sure we include some unused ports to fill the ring. - EdsResourceArgs args({ - {"locality0", - {CreateEndpoint(0), CreateEndpoint(1), MakeNonExistantEndpoint(), - MakeNonExistantEndpoint()}}, - }); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - std::vector> metadata = { - {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; - const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); - EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); - ShutdownBackend(0); - ShutdownBackend(1); - CheckRpcSendFailure( - CheckRpcSendFailureOptions().set_rpc_options(rpc_options)); - EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false)); - // Bring up 0, should be picked as the RPC is hashed to it. - StartBackend(0); - EXPECT_TRUE(channel_->WaitForConnected( - grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds))); - WaitForBackend(0, WaitForBackendOptions(), rpc_options); - // Bring down 0 and bring up 1. - // Note the RPC contains a header value that will always be hashed to - // backend 0. So by purposely bring down backend 0 and bring up another - // backend, this will ensure Picker's first choice of backend 0 will fail - // and it will - // 1. reattempt backend 0 and - // 2. go through the remaining subchannels to find one in READY. - // Since the the entries in the ring is pretty distributed and we have - // unused ports to fill the ring, it is almost guaranteed that the Picker - // will go through some non-READY entries and skip them as per design. - ShutdownBackend(0); - CheckRpcSendFailure( - CheckRpcSendFailureOptions().set_rpc_options(rpc_options)); - StartBackend(1); - EXPECT_TRUE(channel_->WaitForConnected( - grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds))); - WaitForBackend(1, WaitForBackendOptions(), rpc_options); -} - -// Test unspported hash policy types are all ignored before a supported -// policy. -TEST_P(CdsTest, RingHashUnsupportedHashPolicyUntilChannelIdHashing) { - CreateAndStartBackends(2); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy(); - hash_policy_unsupported_1->mutable_cookie()->set_name("cookie"); - auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy(); - hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip( - true); - auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy(); - hash_policy_unsupported_3->mutable_query_parameter()->set_name( - "query_parameter"); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - CheckRpcSendOk(100); - bool found = false; - for (size_t i = 0; i < backends_.size(); ++i) { - if (backends_[i]->backend_service()->request_count() > 0) { - EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) - << "backend " << i; - EXPECT_FALSE(found) << "backend " << i; - found = true; - } - } - EXPECT_TRUE(found); -} - -// Test we nack when ring hash policy has invalid hash function (something -// other than XX_HASH. -TEST_P(CdsTest, RingHashPolicyHasInvalidHashFunction) { - CreateAndStartBackends(1); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - cluster.mutable_ring_hash_lb_config()->set_hash_function( - Cluster::RingHashLbConfig::MURMUR_HASH_2); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT( - response_state->error_message, - ::testing::HasSubstr("ring hash lb config has invalid hash function.")); -} - -// Test we nack when ring hash policy has invalid ring size. -TEST_P(CdsTest, RingHashPolicyHasInvalidMinimumRingSize) { - CreateAndStartBackends(1); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( - 0); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr( - "min_ring_size is not in the range of 1 to 8388608.")); -} - -// Test we nack when ring hash policy has invalid ring size. -TEST_P(CdsTest, RingHashPolicyHasInvalidMaxmumRingSize) { - CreateAndStartBackends(1); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value( - 8388609); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr( - "max_ring_size is not in the range of 1 to 8388608.")); -} - -// Test we nack when ring hash policy has invalid ring size. -TEST_P(CdsTest, RingHashPolicyHasInvalidRingSizeMinGreaterThanMax) { - CreateAndStartBackends(1); - auto cluster = default_cluster_; - cluster.set_lb_policy(Cluster::RING_HASH); - cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value( - 5000); - cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( - 5001); - balancer_->ads_service()->SetCdsResource(cluster); - auto new_route_config = default_route_config_; - auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - auto* hash_policy = route->mutable_route()->add_hash_policy(); - hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); - SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, - new_route_config); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr( - "min_ring_size cannot be greater than max_ring_size.")); -} - class XdsSecurityTest : public XdsEnd2endTest { protected: void SetUp() override { diff --git a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc new file mode 100644 index 00000000000..562e9d66a9c --- /dev/null +++ b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc @@ -0,0 +1,985 @@ +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include +#include + +#include +#include + +#include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" + +#include "src/core/ext/filters/client_channel/backup_poller.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/gpr/env.h" +#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h" +#include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h" +#include "test/cpp/end2end/connection_delay_injector.h" +#include "test/cpp/end2end/xds/xds_end2end_test_lib.h" + +namespace grpc { +namespace testing { +namespace { + +using ::envoy::config::cluster::v3::CustomClusterType; +using ::envoy::config::endpoint::v3::HealthStatus; +using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig; + +class RingHashTest : public XdsEnd2endTest { + protected: + void SetUp() override { + logical_dns_cluster_resolver_response_generator_ = + grpc_core::MakeRefCounted(); + InitClient(); + ChannelArguments args; + args.SetPointerWithVtable( + GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR, + logical_dns_cluster_resolver_response_generator_.get(), + &grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable); + ResetStub(/*failover_timeout_ms=*/0, &args); + } + + grpc_core::ServerAddressList CreateAddressListFromPortList( + const std::vector& ports) { + grpc_core::ServerAddressList addresses; + for (int port : ports) { + absl::StatusOr lb_uri = grpc_core::URI::Parse( + absl::StrCat(ipv6_only_ ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port)); + GPR_ASSERT(lb_uri.ok()); + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(*lb_uri, &address)); + addresses.emplace_back(address.addr, address.len, nullptr); + } + return addresses; + } + + std::string CreateMetadataValueThatHashesToBackendPort(int port) { + return absl::StrCat(ipv6_only_ ? "[::1]" : "127.0.0.1", ":", port, "_0"); + } + + std::string CreateMetadataValueThatHashesToBackend(int index) { + return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port()); + } + + grpc_core::RefCountedPtr + logical_dns_cluster_resolver_response_generator_; +}; + +// Run both with and without load reporting, just for test coverage. +INSTANTIATE_TEST_SUITE_P( + XdsTest, RingHashTest, + ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()), + &XdsTestType::Name); + +TEST_P(RingHashTest, AggregateClusterFallBackFromRingHashAtStartup) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + CreateAndStartBackends(2); + const char* kNewCluster1Name = "new_cluster_1"; + const char* kNewEdsService1Name = "new_eds_service_name_1"; + const char* kNewCluster2Name = "new_cluster_2"; + const char* kNewEdsService2Name = "new_eds_service_name_2"; + // Populate new EDS resources. + EdsResourceArgs args1({ + {"locality0", {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}}, + }); + EdsResourceArgs args2({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args1, kNewEdsService1Name)); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args2, kNewEdsService2Name)); + // Populate new CDS resources. + Cluster new_cluster1 = default_cluster_; + new_cluster1.set_name(kNewCluster1Name); + new_cluster1.mutable_eds_cluster_config()->set_service_name( + kNewEdsService1Name); + balancer_->ads_service()->SetCdsResource(new_cluster1); + Cluster new_cluster2 = default_cluster_; + new_cluster2.set_name(kNewCluster2Name); + new_cluster2.mutable_eds_cluster_config()->set_service_name( + kNewEdsService2Name); + balancer_->ads_service()->SetCdsResource(new_cluster2); + // Create Aggregate Cluster + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kNewCluster1Name); + cluster_config.add_clusters(kNewCluster2Name); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + // Set up route with channel id hashing + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + // Verifying that we are using ring hash as only 1 endpoint is receiving all + // the traffic. + CheckRpcSendOk(100); + bool found = false; + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->backend_service()->request_count() > 0) { + EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) + << "backend " << i; + EXPECT_FALSE(found) << "backend " << i; + found = true; + } + } + EXPECT_TRUE(found); +} + +TEST_P(RingHashTest, + AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + CreateAndStartBackends(1); + const char* kEdsClusterName = "eds_cluster"; + const char* kLogicalDNSClusterName = "logical_dns_cluster"; + // Populate EDS resource. + EdsResourceArgs args({ + {"locality0", + {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}, + kDefaultLocalityWeight, + 0}, + {"locality1", + {MakeNonExistantEndpoint(), MakeNonExistantEndpoint()}, + kDefaultLocalityWeight, + 1}, + }); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Populate new CDS resources. + Cluster eds_cluster = default_cluster_; + eds_cluster.set_name(kEdsClusterName); + balancer_->ads_service()->SetCdsResource(eds_cluster); + // Populate LOGICAL_DNS cluster. + auto logical_dns_cluster = default_cluster_; + logical_dns_cluster.set_name(kLogicalDNSClusterName); + logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); + auto* address = logical_dns_cluster.mutable_load_assignment() + ->add_endpoints() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + address->set_address(kServerName); + address->set_port_value(443); + balancer_->ads_service()->SetCdsResource(logical_dns_cluster); + // Create Aggregate Cluster + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kEdsClusterName); + cluster_config.add_clusters(kLogicalDNSClusterName); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + // Set up route with channel id hashing + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + // Set Logical DNS result + { + grpc_core::ExecCtx exec_ctx; + grpc_core::Resolver::Result result; + result.addresses = CreateAddressListFromPortList(GetBackendPorts()); + logical_dns_cluster_resolver_response_generator_->SetResponse( + std::move(result)); + } + // Inject connection delay to make this act more realistically. + ConnectionDelayInjector delay_injector( + grpc_core::Duration::Milliseconds(500) * grpc_test_slowdown_factor()); + // Send RPC. Need the timeout to be long enough to account for the + // subchannel connection delays. + CheckRpcSendOk(1, RpcOptions().set_timeout_ms(3500)); +} + +// Tests that ring hash policy that hashes using channel id ensures all RPCs +// to go 1 particular backend. +TEST_P(RingHashTest, ChannelIdHashing) { + CreateAndStartBackends(4); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + CheckRpcSendOk(100); + bool found = false; + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->backend_service()->request_count() > 0) { + EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) + << "backend " << i; + EXPECT_FALSE(found) << "backend " << i; + found = true; + } + } + EXPECT_TRUE(found); +} + +// Tests that ring hash policy that hashes using a header value can spread +// RPCs across all the backends. +TEST_P(RingHashTest, HeaderHashing) { + CreateAndStartBackends(4); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Note each type of RPC will contains a header value that will always be + // hashed to a specific backend as the header value matches the value used + // to create the entry in the ring. + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + std::vector> metadata1 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(1)}}; + std::vector> metadata2 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(2)}}; + std::vector> metadata3 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(3)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1)); + const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2)); + const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3)); + WaitForBackend(0, WaitForBackendOptions(), rpc_options); + WaitForBackend(1, WaitForBackendOptions(), rpc_options1); + WaitForBackend(2, WaitForBackendOptions(), rpc_options2); + WaitForBackend(3, WaitForBackendOptions(), rpc_options3); + CheckRpcSendOk(100, rpc_options); + CheckRpcSendOk(100, rpc_options1); + CheckRpcSendOk(100, rpc_options2); + CheckRpcSendOk(100, rpc_options3); + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(100, backends_[i]->backend_service()->request_count()); + } +} + +// Tests that ring hash policy that hashes using a header value and regex +// rewrite to aggregate RPCs to 1 backend. +TEST_P(RingHashTest, HeaderHashingWithRegexRewrite) { + CreateAndStartBackends(4); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + hash_policy->mutable_header() + ->mutable_regex_rewrite() + ->mutable_pattern() + ->set_regex("[0-9]+"); + hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution( + "foo"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + std::vector> metadata1 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(1)}}; + std::vector> metadata2 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(2)}}; + std::vector> metadata3 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(3)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1)); + const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2)); + const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3)); + CheckRpcSendOk(100, rpc_options); + CheckRpcSendOk(100, rpc_options1); + CheckRpcSendOk(100, rpc_options2); + CheckRpcSendOk(100, rpc_options3); + bool found = false; + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->backend_service()->request_count() > 0) { + EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400) + << "backend " << i; + EXPECT_FALSE(found) << "backend " << i; + found = true; + } + } + EXPECT_TRUE(found); +} + +// Tests that ring hash policy that hashes using a random value. +TEST_P(RingHashTest, NoHashPolicy) { + CreateAndStartBackends(2); + const double kDistribution50Percent = 0.5; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(0, 2, WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(kNumRpcs); + const int request_count_1 = backends_[0]->backend_service()->request_count(); + const int request_count_2 = backends_[1]->backend_service()->request_count(); + EXPECT_THAT(static_cast(request_count_1) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(request_count_2) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); +} + +// Test that ring hash policy evaluation will continue past the terminal +// policy if no results are produced yet. +TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) { + CreateAndStartBackends(2); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("header_not_present"); + hash_policy->set_terminal(true); + auto* hash_policy2 = route->mutable_route()->add_hash_policy(); + hash_policy2->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + CheckRpcSendOk(100, rpc_options); + EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100); + EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0); +} + +// Test random hash is used when header hashing specified a header field that +// the RPC did not have. +TEST_P(RingHashTest, HashOnHeaderThatIsNotPresent) { + CreateAndStartBackends(2); + const double kDistribution50Percent = 0.5; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("header_not_present"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + std::vector> metadata = { + {"unmatched_header", absl::StrFormat("%" PRIu32, rand())}, + }; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(0, 2, WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(kNumRpcs, rpc_options); + const int request_count_1 = backends_[0]->backend_service()->request_count(); + const int request_count_2 = backends_[1]->backend_service()->request_count(); + EXPECT_THAT(static_cast(request_count_1) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(request_count_2) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); +} + +// Test random hash is used when only unsupported hash policies are +// configured. +TEST_P(RingHashTest, UnsupportedHashPolicyDefaultToRandomHashing) { + CreateAndStartBackends(2); + const double kDistribution50Percent = 0.5; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_1->mutable_cookie()->set_name("cookie"); + auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip( + true); + auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_3->mutable_query_parameter()->set_name( + "query_parameter"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(0, 2, WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(kNumRpcs); + const int request_count_1 = backends_[0]->backend_service()->request_count(); + const int request_count_2 = backends_[1]->backend_service()->request_count(); + EXPECT_THAT(static_cast(request_count_1) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(request_count_2) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); +} + +// Tests that ring hash policy that hashes using a random value can spread +// RPCs across all the backends according to locality weight. +TEST_P(RingHashTest, RandomHashingDistributionAccordingToEndpointWeight) { + CreateAndStartBackends(2); + const size_t kWeight1 = 1; + const size_t kWeight2 = 2; + const size_t kWeightTotal = kWeight1 + kWeight2; + const double kWeight33Percent = static_cast(kWeight1) / kWeightTotal; + const double kWeight66Percent = static_cast(kWeight2) / kWeightTotal; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + EdsResourceArgs args({{"locality0", + {CreateEndpoint(0, HealthStatus::UNKNOWN, 1), + CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(0, 2, WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(kNumRpcs); + const int weight_33_request_count = + backends_[0]->backend_service()->request_count(); + const int weight_66_request_count = + backends_[1]->backend_service()->request_count(); + EXPECT_THAT(static_cast(weight_33_request_count) / kNumRpcs, + ::testing::DoubleNear(kWeight33Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(weight_66_request_count) / kNumRpcs, + ::testing::DoubleNear(kWeight66Percent, kErrorTolerance)); +} + +// Tests that ring hash policy that hashes using a random value can spread +// RPCs across all the backends according to locality weight. +TEST_P(RingHashTest, + RandomHashingDistributionAccordingToLocalityAndEndpointWeight) { + CreateAndStartBackends(2); + const size_t kWeight1 = 1 * 1; + const size_t kWeight2 = 2 * 2; + const size_t kWeightTotal = kWeight1 + kWeight2; + const double kWeight20Percent = static_cast(kWeight1) / kWeightTotal; + const double kWeight80Percent = static_cast(kWeight2) / kWeightTotal; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + EdsResourceArgs args( + {{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1}, + {"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(0, 2, WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(kNumRpcs); + const int weight_20_request_count = + backends_[0]->backend_service()->request_count(); + const int weight_80_request_count = + backends_[1]->backend_service()->request_count(); + EXPECT_THAT(static_cast(weight_20_request_count) / kNumRpcs, + ::testing::DoubleNear(kWeight20Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(weight_80_request_count) / kNumRpcs, + ::testing::DoubleNear(kWeight80Percent, kErrorTolerance)); +} + +// Tests that ring hash policy that hashes using a fixed string ensures all +// RPCs to go 1 particular backend; and that subsequent hashing policies are +// ignored due to the setting of terminal. +TEST_P(RingHashTest, FixedHashingTerminalPolicy) { + CreateAndStartBackends(2); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("fixed_string"); + hash_policy->set_terminal(true); + auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy(); + hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + std::vector> metadata = { + {"fixed_string", "fixed_value"}, + {"random_string", absl::StrFormat("%" PRIu32, rand())}, + }; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + CheckRpcSendOk(100, rpc_options); + bool found = false; + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->backend_service()->request_count() > 0) { + EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) + << "backend " << i; + EXPECT_FALSE(found) << "backend " << i; + found = true; + } + } + EXPECT_TRUE(found); +} + +// Test that the channel will go from idle to ready via connecting; +// (tho it is not possible to catch the connecting state before moving to +// ready) +TEST_P(RingHashTest, IdleToReady) { + CreateAndStartBackends(1); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); + CheckRpcSendOk(); + EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false)); +} + +// Test that the channel will transition to READY once it starts +// connecting even if there are no RPCs being sent to the picker. +TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) { + // Create EDS resource. + CreateAndStartBackends(1); + auto non_existant_endpoint = MakeNonExistantEndpoint(); + EdsResourceArgs args( + {{"locality0", {non_existant_endpoint, CreateEndpoint(0)}}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Change CDS resource to use RING_HASH. + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + // Add hash policy to RDS resource. + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + // A connection injector that cancels the RPC after seeing the + // connection attempt for the non-existant endpoint. + class ConnectionInjector : public ConnectionAttemptInjector { + public: + explicit ConnectionInjector(int port) : port_(port) {} + + void HandleConnection(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline) override { + { + grpc_core::MutexLock lock(&mu_); + const int port = grpc_sockaddr_get_port(addr); + gpr_log(GPR_INFO, "==> HandleConnection(): seen_port_=%d, port=%d", + seen_port_, port); + if (!seen_port_ && port == port_) { + gpr_log(GPR_INFO, "*** SEEN P0 CONNECTION ATTEMPT"); + seen_port_ = true; + cond_.Signal(); + } + } + AttemptConnection(closure, ep, interested_parties, channel_args, addr, + deadline); + } + + void WaitForP0ConnectionAttempt() { + grpc_core::MutexLock lock(&mu_); + while (!seen_port_) { + cond_.Wait(&mu_); + } + } + + private: + const int port_; + + grpc_core::Mutex mu_; + grpc_core::CondVar cond_; + bool seen_port_ ABSL_GUARDED_BY(mu_) = false; + }; + ConnectionInjector connection_injector(non_existant_endpoint.port); + // A long-running RPC, just used to send the RPC in another thread. + LongRunningRpc rpc; + std::vector> metadata = { + {"address_hash", + CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}}; + rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata( + std::move(metadata))); + // Wait for the RPC to trigger the P0 connection attempt, then cancel it. + connection_injector.WaitForP0ConnectionAttempt(); + rpc.CancelRpc(); + // Wait for channel to become connected without any pending RPC. + EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5))); + // RPC should have been cancelled. + EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code()); + // Make sure the backend did not get any requests. + EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count()); +} + +// Test that when the first pick is down leading to a transient failure, we +// will move on to the next ring hash entry. +TEST_P(RingHashTest, TransientFailureCheckNextOne) { + CreateAndStartBackends(1); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + std::vector endpoints; + const int unused_port = grpc_pick_unused_port_or_die(); + endpoints.emplace_back(unused_port); + endpoints.emplace_back(backends_[0]->port()); + EdsResourceArgs args({{"locality0", std::move(endpoints)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + std::vector> metadata = { + {"address_hash", + CreateMetadataValueThatHashesToBackendPort(unused_port)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + WaitForBackend(0, WaitForBackendOptions(), rpc_options); + CheckRpcSendOk(100, rpc_options); +} + +// Test that when a backend goes down, we will move on to the next subchannel +// (with a lower priority). When the backend comes back up, traffic will move +// back. +TEST_P(RingHashTest, SwitchToLowerPrioirtyAndThenBack) { + CreateAndStartBackends(2); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight, + 0}, + {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight, + 1}, + }); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + WaitForBackend(0, WaitForBackendOptions(), rpc_options); + ShutdownBackend(0); + WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true), + rpc_options); + StartBackend(0); + WaitForBackend(0, WaitForBackendOptions(), rpc_options); + CheckRpcSendOk(100, rpc_options); + EXPECT_EQ(100, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(0, backends_[1]->backend_service()->request_count()); +} + +// Test that when all backends are down, we will keep reattempting. +TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) { + CreateAndStartBackends(1); + const uint32_t kConnectionTimeoutMilliseconds = 5000; + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args( + {{"locality0", {MakeNonExistantEndpoint(), CreateEndpoint(0)}}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); + ShutdownBackend(0); + CheckRpcSendFailure(CheckRpcSendFailureOptions().set_rpc_options( + RpcOptions().set_metadata(std::move(metadata)))); + StartBackend(0); + // Ensure we are actively connecting without any traffic. + EXPECT_TRUE(channel_->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds))); +} + +// Test that when all backends are down and then up, we may pick a TF backend +// and we will then jump to ready backend. +TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) { + CreateAndStartBackends(2); + const uint32_t kConnectionTimeoutMilliseconds = 5000; + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + // Make sure we include some unused ports to fill the ring. + EdsResourceArgs args({ + {"locality0", + {CreateEndpoint(0), CreateEndpoint(1), MakeNonExistantEndpoint(), + MakeNonExistantEndpoint()}}, + }); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); + ShutdownBackend(0); + ShutdownBackend(1); + CheckRpcSendFailure( + CheckRpcSendFailureOptions().set_rpc_options(rpc_options)); + EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false)); + // Bring up 0, should be picked as the RPC is hashed to it. + StartBackend(0); + EXPECT_TRUE(channel_->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds))); + WaitForBackend(0, WaitForBackendOptions(), rpc_options); + // Bring down 0 and bring up 1. + // Note the RPC contains a header value that will always be hashed to + // backend 0. So by purposely bring down backend 0 and bring up another + // backend, this will ensure Picker's first choice of backend 0 will fail + // and it will + // 1. reattempt backend 0 and + // 2. go through the remaining subchannels to find one in READY. + // Since the the entries in the ring is pretty distributed and we have + // unused ports to fill the ring, it is almost guaranteed that the Picker + // will go through some non-READY entries and skip them as per design. + ShutdownBackend(0); + CheckRpcSendFailure( + CheckRpcSendFailureOptions().set_rpc_options(rpc_options)); + StartBackend(1); + EXPECT_TRUE(channel_->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds))); + WaitForBackend(1, WaitForBackendOptions(), rpc_options); +} + +// Test unspported hash policy types are all ignored before a supported +// policy. +TEST_P(RingHashTest, UnsupportedHashPolicyUntilChannelIdHashing) { + CreateAndStartBackends(2); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_1->mutable_cookie()->set_name("cookie"); + auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip( + true); + auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_3->mutable_query_parameter()->set_name( + "query_parameter"); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + CheckRpcSendOk(100); + bool found = false; + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->backend_service()->request_count() > 0) { + EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) + << "backend " << i; + EXPECT_FALSE(found) << "backend " << i; + found = true; + } + } + EXPECT_TRUE(found); +} + +// Test we nack when ring hash policy has invalid hash function (something +// other than XX_HASH. +TEST_P(RingHashTest, InvalidHashFunction) { + CreateAndStartBackends(1); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + cluster.mutable_ring_hash_lb_config()->set_hash_function( + Cluster::RingHashLbConfig::MURMUR_HASH_2); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT( + response_state->error_message, + ::testing::HasSubstr("ring hash lb config has invalid hash function.")); +} + +// Test we nack when ring hash policy has invalid ring size. +TEST_P(RingHashTest, InvalidMinimumRingSize) { + CreateAndStartBackends(1); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 0); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr( + "min_ring_size is not in the range of 1 to 8388608.")); +} + +// Test we nack when ring hash policy has invalid ring size. +TEST_P(RingHashTest, InvalidMaxmumRingSize) { + CreateAndStartBackends(1); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value( + 8388609); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr( + "max_ring_size is not in the range of 1 to 8388608.")); +} + +// Test we nack when ring hash policy has invalid ring size. +TEST_P(RingHashTest, InvalidRingSizeMinGreaterThanMax) { + CreateAndStartBackends(1); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value( + 5000); + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 5001); + balancer_->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr( + "min_ring_size cannot be greater than max_ring_size.")); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(&argc, argv); + ::testing::InitGoogleTest(&argc, argv); + // Make the backup poller poll very frequently in order to pick up + // updates from all the subchannels's FDs. + GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1); +#if TARGET_OS_IPHONE + // Workaround Apple CFStream bug + gpr_setenv("grpc_cfstream", "0"); +#endif + grpc_init(); + grpc::testing::ConnectionAttemptInjector::Init(); + const auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +}