From e7ddd7b436fde93f777f428b2cf8c0d16b98dc9a Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Sat, 16 Apr 2022 18:08:53 -0700 Subject: [PATCH] xds_end2end_test: Move aggregate and logical_dns cluster tests to their own file (#29298) * move some code around * remove num_backends parameter from XdsEnd2endTest * remove use_xds_enabled_server param from XdsEnd2endTest * remove xds_resource_does_not_exist_timeout_ms param from XdsEnd2endTest * remove client_load_reporting_interval_seconds param from XdsEnd2endTest * start moving CreateAndStartBackends() into individual tests * finish moving CreateAndStartBackends() into individual tests * remove unused variable * remove SetEdsResourceWithDelay * fix test flake * clang-tidy * clang-format * move test framework to its own library * fix build * clang-format * fix windows build * rename TestType to XdsTestType * move BackendServiceImpl inside of BackendServerThread * clang-format * move AdminServerThread to CSDS test suite * remove unnecessary deps * move aggregate and logical_dns cluster tests to their own file * split aggregate and logical_dns tests into separate suites * clang-format * re-add flaky tag * clang-tidy and remove unnecessary dep --- CMakeLists.txt | 174 +++- build_autogenerated.yaml | 59 +- test/cpp/end2end/xds/BUILD | 26 +- .../xds/xds_cluster_type_end2end_test.cc | 908 ++++++++++++++++++ test/cpp/end2end/xds/xds_end2end_test.cc | 831 +--------------- 5 files changed, 1164 insertions(+), 834 deletions(-) create mode 100644 test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 224f1f1f4d3..9582d96efff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1178,6 +1178,9 @@ if(gRPC_BUILD_TESTS) endif() add_dependencies(buildtests_cxx xds_bootstrap_test) add_dependencies(buildtests_cxx xds_certificate_provider_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx xds_cluster_type_end2end_test) + endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx xds_core_end2end_test) endif() @@ -17118,6 +17121,176 @@ target_link_libraries(xds_certificate_provider_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + + add_executable(xds_cluster_type_end2end_test + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h + test/cpp/end2end/connection_delay_injector.cc + test/cpp/end2end/test_service_impl.cc + test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc + test/cpp/end2end/xds/xds_end2end_test_lib.cc + test/cpp/end2end/xds/xds_server.cc + test/cpp/util/tls_test_utils.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + + target_include_directories(xds_cluster_type_end2end_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(xds_cluster_type_end2end_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc++_test_util + ) + + +endif() endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -17693,7 +17866,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/tls.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/tls.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/tls.grpc.pb.h - test/cpp/end2end/connection_delay_injector.cc test/cpp/end2end/test_service_impl.cc test/cpp/end2end/xds/xds_end2end_test.cc test/cpp/end2end/xds/xds_end2end_test_lib.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index d1cf320abbe..ad3e3de1f4b 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -8378,6 +8378,63 @@ targets: - test/core/xds/xds_certificate_provider_test.cc deps: - grpc_test_util +- name: xds_cluster_type_end2end_test + gtest: true + build: test + run: false + language: c++ + headers: + - test/cpp/end2end/connection_delay_injector.h + - test/cpp/end2end/counted_service.h + - test/cpp/end2end/test_service_impl.h + - test/cpp/end2end/xds/xds_end2end_test_lib.h + - test/cpp/end2end/xds/xds_server.h + - test/cpp/util/tls_test_utils.h + src: + - src/proto/grpc/testing/duplicate/echo_duplicate.proto + - src/proto/grpc/testing/echo.proto + - src/proto/grpc/testing/echo_messages.proto + - src/proto/grpc/testing/simple_messages.proto + - src/proto/grpc/testing/xds/ads_for_test.proto + - src/proto/grpc/testing/xds/eds_for_test.proto + - src/proto/grpc/testing/xds/lrs_for_test.proto + - src/proto/grpc/testing/xds/v3/address.proto + - src/proto/grpc/testing/xds/v3/ads.proto + - src/proto/grpc/testing/xds/v3/aggregate_cluster.proto + - src/proto/grpc/testing/xds/v3/base.proto + - src/proto/grpc/testing/xds/v3/cluster.proto + - src/proto/grpc/testing/xds/v3/config_source.proto + - src/proto/grpc/testing/xds/v3/discovery.proto + - src/proto/grpc/testing/xds/v3/endpoint.proto + - src/proto/grpc/testing/xds/v3/expr.proto + - src/proto/grpc/testing/xds/v3/extension.proto + - src/proto/grpc/testing/xds/v3/http_connection_manager.proto + - src/proto/grpc/testing/xds/v3/http_filter_rbac.proto + - src/proto/grpc/testing/xds/v3/listener.proto + - src/proto/grpc/testing/xds/v3/load_report.proto + - src/proto/grpc/testing/xds/v3/lrs.proto + - src/proto/grpc/testing/xds/v3/metadata.proto + - src/proto/grpc/testing/xds/v3/path.proto + - src/proto/grpc/testing/xds/v3/percent.proto + - src/proto/grpc/testing/xds/v3/protocol.proto + - src/proto/grpc/testing/xds/v3/range.proto + - src/proto/grpc/testing/xds/v3/rbac.proto + - src/proto/grpc/testing/xds/v3/regex.proto + - src/proto/grpc/testing/xds/v3/route.proto + - src/proto/grpc/testing/xds/v3/router.proto + - src/proto/grpc/testing/xds/v3/string.proto + - test/cpp/end2end/connection_delay_injector.cc + - test/cpp/end2end/test_service_impl.cc + - test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc + - test/cpp/end2end/xds/xds_end2end_test_lib.cc + - test/cpp/end2end/xds/xds_server.cc + - test/cpp/util/tls_test_utils.cc + deps: + - grpc++_test_util + platforms: + - linux + - posix + - mac - name: xds_core_end2end_test gtest: true build: test @@ -8517,7 +8574,6 @@ targets: run: false language: c++ headers: - - test/cpp/end2end/connection_delay_injector.h - test/cpp/end2end/counted_service.h - test/cpp/end2end/test_service_impl.h - test/cpp/end2end/xds/no_op_http_filter.h @@ -8562,7 +8618,6 @@ targets: - src/proto/grpc/testing/xds/v3/router.proto - src/proto/grpc/testing/xds/v3/string.proto - src/proto/grpc/testing/xds/v3/tls.proto - - test/cpp/end2end/connection_delay_injector.cc - test/cpp/end2end/test_service_impl.cc - test/cpp/end2end/xds/xds_end2end_test.cc - test/cpp/end2end/xds/xds_end2end_test_lib.cc diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD index acfa723dd27..90475b63e1c 100644 --- a/test/cpp/end2end/xds/BUILD +++ b/test/cpp/end2end/xds/BUILD @@ -140,13 +140,37 @@ grpc_cc_test( "//src/proto/grpc/testing/xds/v3:router_proto", "//src/proto/grpc/testing/xds/v3:tls_proto", "//test/core/util:grpc_test_util", - "//test/cpp/end2end:connection_delay_injector", "//test/cpp/util:test_config", "//test/cpp/util:test_util", "//test/cpp/util:tls_test_utils", ], ) +grpc_cc_test( + name = "xds_cluster_type_end2end_test", + size = "large", + srcs = ["xds_cluster_type_end2end_test.cc"], + external_deps = [ + "gtest", + ], + flaky = True, # TODO(b/144705388) + linkstatic = True, # Fixes dyld error on MacOS + tags = [ + "no_test_ios", + "no_windows", + ], # TODO(jtattermusch): fix test on windows + deps = [ + ":xds_end2end_test_lib", + "//:gpr", + "//:grpc", + "//:grpc++", + "//:grpc_resolver_fake", + "//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/end2end:connection_delay_injector", + ], +) + grpc_cc_test( name = "xds_core_end2end_test", size = "large", diff --git a/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc new file mode 100644 index 00000000000..8bd0207de50 --- /dev/null +++ b/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc @@ -0,0 +1,908 @@ +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include +#include + +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" + +#include "src/core/ext/filters/client_channel/backup_poller.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/resolver/server_address.h" +#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h" +#include "test/cpp/end2end/connection_delay_injector.h" +#include "test/cpp/end2end/xds/xds_end2end_test_lib.h" + +namespace grpc { +namespace testing { +namespace { + +using ::envoy::config::cluster::v3::CustomClusterType; +using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig; + +class ClusterTypeTest : public XdsEnd2endTest { + protected: + void SetUp() override { + logical_dns_cluster_resolver_response_generator_ = + grpc_core::MakeRefCounted(); + InitClient(); + ChannelArguments args; + args.SetPointerWithVtable( + GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR, + logical_dns_cluster_resolver_response_generator_.get(), + &grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable); + ResetStub(/*failover_timeout_ms=*/0, &args); + } + + grpc_core::ServerAddressList CreateAddressListFromPortList( + const std::vector& ports) { + grpc_core::ServerAddressList addresses; + for (int port : ports) { + absl::StatusOr lb_uri = grpc_core::URI::Parse( + absl::StrCat(ipv6_only_ ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port)); + GPR_ASSERT(lb_uri.ok()); + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(*lb_uri, &address)); + addresses.emplace_back(address.addr, address.len, nullptr); + } + return addresses; + } + + grpc_core::RefCountedPtr + logical_dns_cluster_resolver_response_generator_; +}; + +// +// LOGICAL_DNS cluster tests +// + +using LogicalDNSClusterTest = ClusterTypeTest; + +INSTANTIATE_TEST_SUITE_P(XdsTest, LogicalDNSClusterTest, + ::testing::Values(XdsTestType()), &XdsTestType::Name); + +TEST_P(LogicalDNSClusterTest, Basic) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + CreateAndStartBackends(1); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + auto* address = cluster.mutable_load_assignment() + ->add_endpoints() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + address->set_address(kServerName); + address->set_port_value(443); + balancer_->ads_service()->SetCdsResource(cluster); + // Set Logical DNS result + { + grpc_core::ExecCtx exec_ctx; + grpc_core::Resolver::Result result; + result.addresses = CreateAddressListFromPortList(GetBackendPorts()); + logical_dns_cluster_resolver_response_generator_->SetResponse( + std::move(result)); + } + // RPCs should succeed. + CheckRpcSendOk(); +} + +TEST_P(LogicalDNSClusterTest, MissingLoadAssignment) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr( + "load_assignment not present for LOGICAL_DNS cluster")); +} + +TEST_P(LogicalDNSClusterTest, MissingLocalities) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + cluster.mutable_load_assignment(); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT( + response_state->error_message, + ::testing::HasSubstr("load_assignment for LOGICAL_DNS cluster must have " + "exactly one locality, found 0")); +} + +TEST_P(LogicalDNSClusterTest, MultipleLocalities) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + auto* load_assignment = cluster.mutable_load_assignment(); + load_assignment->add_endpoints(); + load_assignment->add_endpoints(); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT( + response_state->error_message, + ::testing::HasSubstr("load_assignment for LOGICAL_DNS cluster must have " + "exactly one locality, found 2")); +} + +TEST_P(LogicalDNSClusterTest, MissingEndpoints) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + cluster.mutable_load_assignment()->add_endpoints(); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr( + "locality for LOGICAL_DNS cluster must have exactly one " + "endpoint, found 0")); +} + +TEST_P(LogicalDNSClusterTest, MultipleEndpoints) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + auto* locality = cluster.mutable_load_assignment()->add_endpoints(); + locality->add_lb_endpoints(); + locality->add_lb_endpoints(); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr( + "locality for LOGICAL_DNS cluster must have exactly one " + "endpoint, found 2")); +} + +TEST_P(LogicalDNSClusterTest, EmptyEndpoint) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + cluster.mutable_load_assignment()->add_endpoints()->add_lb_endpoints(); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr("LbEndpoint endpoint field not set")); +} + +TEST_P(LogicalDNSClusterTest, EndpointMissingAddress) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + cluster.mutable_load_assignment() + ->add_endpoints() + ->add_lb_endpoints() + ->mutable_endpoint(); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr("Endpoint address field not set")); +} + +TEST_P(LogicalDNSClusterTest, AddressMissingSocketAddress) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + cluster.mutable_load_assignment() + ->add_endpoints() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address(); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr("Address socket_address field not set")); +} + +TEST_P(LogicalDNSClusterTest, SocketAddressHasResolverName) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + cluster.mutable_load_assignment() + ->add_endpoints() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address() + ->set_resolver_name("foo"); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr("LOGICAL_DNS clusters must NOT have a " + "custom resolver name set")); +} + +TEST_P(LogicalDNSClusterTest, SocketAddressMissingAddress) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + cluster.mutable_load_assignment() + ->add_endpoints() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr("SocketAddress address field not set")); +} + +TEST_P(LogicalDNSClusterTest, SocketAddressMissingPort) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + cluster.mutable_load_assignment() + ->add_endpoints() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address() + ->set_address(kServerName); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr("SocketAddress port_value field not set")); +} + +// Test that CDS client should send a NACK if cluster type is Logical DNS but +// the feature is not yet supported. +TEST_P(LogicalDNSClusterTest, Disabled) { + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr("DiscoveryType is not valid.")); +} + +// +// aggregate cluster tests +// + +// TODO(roth): Add tests showing that load reporting is enabled on a +// per-underlying-cluster basis. + +using AggregateClusterTest = ClusterTypeTest; + +INSTANTIATE_TEST_SUITE_P(XdsTest, AggregateClusterTest, + ::testing::Values(XdsTestType()), &XdsTestType::Name); + +TEST_P(AggregateClusterTest, ) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + CreateAndStartBackends(2); + const char* kNewCluster1Name = "new_cluster_1"; + const char* kNewEdsService1Name = "new_eds_service_name_1"; + const char* kNewCluster2Name = "new_cluster_2"; + const char* kNewEdsService2Name = "new_eds_service_name_2"; + // Populate new EDS resources. + EdsResourceArgs args1({ + {"locality0", CreateEndpointsForBackends(0, 1)}, + }); + EdsResourceArgs args2({ + {"locality0", CreateEndpointsForBackends(1, 2)}, + }); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args1, kNewEdsService1Name)); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args2, kNewEdsService2Name)); + // Populate new CDS resources. + Cluster new_cluster1 = default_cluster_; + new_cluster1.set_name(kNewCluster1Name); + new_cluster1.mutable_eds_cluster_config()->set_service_name( + kNewEdsService1Name); + balancer_->ads_service()->SetCdsResource(new_cluster1); + Cluster new_cluster2 = default_cluster_; + new_cluster2.set_name(kNewCluster2Name); + new_cluster2.mutable_eds_cluster_config()->set_service_name( + kNewEdsService2Name); + balancer_->ads_service()->SetCdsResource(new_cluster2); + // Create Aggregate Cluster + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kNewCluster1Name); + cluster_config.add_clusters(kNewCluster2Name); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + // Wait for traffic to go to backend 0. + WaitForBackend(0); + // Shutdown backend 0 and wait for all traffic to go to backend 1. + ShutdownBackend(0); + WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true)); + auto response_state = balancer_->ads_service()->cds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Bring backend 0 back and ensure all traffic go back to it. + StartBackend(0); + WaitForBackend(0); +} + +TEST_P(AggregateClusterTest, DiamondDependency) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + const char* kNewClusterName1 = "new_cluster_1"; + const char* kNewEdsServiceName1 = "new_eds_service_name_1"; + const char* kNewClusterName2 = "new_cluster_2"; + const char* kNewEdsServiceName2 = "new_eds_service_name_2"; + const char* kNewAggregateClusterName = "new_aggregate_cluster"; + // Populate new EDS resources. + CreateAndStartBackends(2); + EdsResourceArgs args1({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args1, kNewEdsServiceName1)); + EdsResourceArgs args2({{"locality0", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args2, kNewEdsServiceName2)); + // Populate new CDS resources. + Cluster new_cluster1 = default_cluster_; + new_cluster1.set_name(kNewClusterName1); + new_cluster1.mutable_eds_cluster_config()->set_service_name( + kNewEdsServiceName1); + balancer_->ads_service()->SetCdsResource(new_cluster1); + Cluster new_cluster2 = default_cluster_; + new_cluster2.set_name(kNewClusterName2); + new_cluster2.mutable_eds_cluster_config()->set_service_name( + kNewEdsServiceName2); + balancer_->ads_service()->SetCdsResource(new_cluster2); + // Populate top-level aggregate cluster pointing to kNewClusterName1 + // and kNewAggregateClusterName. + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kNewClusterName1); + cluster_config.add_clusters(kNewAggregateClusterName); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + // Populate kNewAggregateClusterName aggregate cluster pointing to + // kNewClusterName1 and kNewClusterName2. + auto aggregate_cluster2 = default_cluster_; + aggregate_cluster2.set_name(kNewAggregateClusterName); + custom_cluster = aggregate_cluster2.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + cluster_config.Clear(); + cluster_config.add_clusters(kNewClusterName1); + cluster_config.add_clusters(kNewClusterName2); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(aggregate_cluster2); + // Wait for traffic to go to backend 0. + WaitForBackend(0); + // Shutdown backend 0 and wait for all traffic to go to backend 1. + ShutdownBackend(0); + WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true)); + auto response_state = balancer_->ads_service()->cds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Bring backend 0 back and ensure all traffic go back to it. + StartBackend(0); + WaitForBackend(0); +} + +// This test covers a bug found in the following scenario: +// 1. P0 reports TRANSIENT_FAILURE, so we start connecting to P1. +// 2. While P1 is still in CONNECTING, P0 goes back to READY, so we +// switch back to P0, deactivating P1. +// 3. P0 then goes back to TRANSIENT_FAILURE, and we reactivate P1. +// The bug caused us to fail to choose P1 even though it is in state +// CONNECTING (because the failover timer was not running), so we +// incorrectly failed the RPCs. +TEST_P(AggregateClusterTest, FallBackWithConnectivityChurn) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + CreateAndStartBackends(2); + const char* kClusterName1 = "cluster1"; + const char* kClusterName2 = "cluster2"; + const char* kEdsServiceName2 = "eds_service_name2"; + // Populate EDS resources. + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + args = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args, kEdsServiceName2)); + // Populate new CDS resources. + Cluster cluster1 = default_cluster_; + cluster1.set_name(kClusterName1); + balancer_->ads_service()->SetCdsResource(cluster1); + Cluster cluster2 = default_cluster_; + cluster2.set_name(kClusterName2); + cluster2.mutable_eds_cluster_config()->set_service_name(kEdsServiceName2); + balancer_->ads_service()->SetCdsResource(cluster2); + // Create Aggregate Cluster + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kClusterName1); + cluster_config.add_clusters(kClusterName2); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + // This class injects itself into all TCP connection attempts made + // against iomgr. It intercepts the attempts for the P0 and P1 + // backends and allows them to proceed as desired to simulate the case + // being tested. + class ConnectionInjector : public ConnectionAttemptInjector { + public: + ConnectionInjector(int p0_port, int p1_port) + : p0_port_(p0_port), p1_port_(p1_port) {} + + void HandleConnection(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline) override { + { + grpc_core::MutexLock lock(&mu_); + const int port = grpc_sockaddr_get_port(addr); + gpr_log(GPR_INFO, "==> HandleConnection(): state_=%d, port=%d", state_, + port); + switch (state_) { + case kInit: + // Make P0 report TF, which should trigger us to try to connect to + // P1. + if (port == p0_port_) { + gpr_log(GPR_INFO, "*** INJECTING FAILURE FOR P0 ENDPOINT"); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "injected connection failure")); + state_ = kP0Failed; + return; + } + break; + case kP0Failed: + // Hold connection attempt to P1 so that it stays in CONNECTING. + if (port == p1_port_) { + gpr_log(GPR_INFO, + "*** DELAYING CONNECTION ATTEMPT FOR P1 ENDPOINT"); + queued_p1_attempt_ = absl::make_unique( + closure, ep, interested_parties, channel_args, addr, + deadline); + state_ = kDone; + return; + } + break; + case kDone: + // P0 should attempt reconnection. Log it to make the test + // easier to debug, but allow it to complete, so that the + // priority policy deactivates P1. + if (port == p0_port_) { + gpr_log(GPR_INFO, + "*** INTERCEPTING CONNECTION ATTEMPT FOR P0 ENDPOINT"); + } + break; + } + } + AttemptConnection(closure, ep, interested_parties, channel_args, addr, + deadline); + } + + // Invoked by the test when the RPC to the P0 backend has succeeded + // and it's ready to allow the P1 connection attempt to proceed. + void CompletePriority1Connection() { + grpc_core::ExecCtx exec_ctx; + std::unique_ptr attempt; + { + grpc_core::MutexLock lock(&mu_); + GPR_ASSERT(state_ == kDone); + attempt = std::move(queued_p1_attempt_); + } + attempt->Resume(); + } + + private: + const int p0_port_; + const int p1_port_; + + grpc_core::Mutex mu_; + enum { + kInit, + kP0Failed, + kDone, + } state_ ABSL_GUARDED_BY(mu_) = kInit; + std::unique_ptr queued_p1_attempt_ ABSL_GUARDED_BY(mu_); + }; + ConnectionInjector connection_attempt_injector(backends_[0]->port(), + backends_[1]->port()); + // Wait for P0 backend. + // Increase timeout to account for subchannel connection delays. + WaitForBackend(0, WaitForBackendOptions(), RpcOptions().set_timeout_ms(2000)); + // Bring down the P0 backend. + ShutdownBackend(0); + // Allow the connection attempt to the P1 backend to resume. + connection_attempt_injector.CompletePriority1Connection(); + // Wait for P1 backend to start getting traffic. + WaitForBackend(1); +} + +TEST_P(AggregateClusterTest, EdsToLogicalDns) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + CreateAndStartBackends(2); + const char* kNewCluster1Name = "new_cluster_1"; + const char* kNewEdsService1Name = "new_eds_service_name_1"; + const char* kLogicalDNSClusterName = "logical_dns_cluster"; + // Populate new EDS resources. + EdsResourceArgs args1({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args1, kNewEdsService1Name)); + // Populate new CDS resources. + Cluster new_cluster1 = default_cluster_; + new_cluster1.set_name(kNewCluster1Name); + new_cluster1.mutable_eds_cluster_config()->set_service_name( + kNewEdsService1Name); + balancer_->ads_service()->SetCdsResource(new_cluster1); + // Create Logical DNS Cluster + auto logical_dns_cluster = default_cluster_; + logical_dns_cluster.set_name(kLogicalDNSClusterName); + logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); + auto* address = logical_dns_cluster.mutable_load_assignment() + ->add_endpoints() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + address->set_address(kServerName); + address->set_port_value(443); + balancer_->ads_service()->SetCdsResource(logical_dns_cluster); + // Create Aggregate Cluster + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kNewCluster1Name); + cluster_config.add_clusters(kLogicalDNSClusterName); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + // Set Logical DNS result + { + grpc_core::ExecCtx exec_ctx; + grpc_core::Resolver::Result result; + result.addresses = CreateAddressListFromPortList(GetBackendPorts(1, 2)); + logical_dns_cluster_resolver_response_generator_->SetResponse( + std::move(result)); + } + // Wait for traffic to go to backend 0. + WaitForBackend(0); + // Shutdown backend 0 and wait for all traffic to go to backend 1. + ShutdownBackend(0); + WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true)); + auto response_state = balancer_->ads_service()->cds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Bring backend 0 back and ensure all traffic go back to it. + StartBackend(0); + WaitForBackend(0); +} + +TEST_P(AggregateClusterTest, LogicalDnsToEds) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + CreateAndStartBackends(2); + const char* kNewCluster2Name = "new_cluster_2"; + const char* kNewEdsService2Name = "new_eds_service_name_2"; + const char* kLogicalDNSClusterName = "logical_dns_cluster"; + // Populate new EDS resources. + EdsResourceArgs args2({ + {"locality0", CreateEndpointsForBackends(1, 2)}, + }); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args2, kNewEdsService2Name)); + // Populate new CDS resources. + Cluster new_cluster2 = default_cluster_; + new_cluster2.set_name(kNewCluster2Name); + new_cluster2.mutable_eds_cluster_config()->set_service_name( + kNewEdsService2Name); + balancer_->ads_service()->SetCdsResource(new_cluster2); + // Create Logical DNS Cluster + auto logical_dns_cluster = default_cluster_; + logical_dns_cluster.set_name(kLogicalDNSClusterName); + logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); + auto* address = logical_dns_cluster.mutable_load_assignment() + ->add_endpoints() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + address->set_address(kServerName); + address->set_port_value(443); + balancer_->ads_service()->SetCdsResource(logical_dns_cluster); + // Create Aggregate Cluster + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kLogicalDNSClusterName); + cluster_config.add_clusters(kNewCluster2Name); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + // Set Logical DNS result + { + grpc_core::ExecCtx exec_ctx; + grpc_core::Resolver::Result result; + result.addresses = CreateAddressListFromPortList(GetBackendPorts(0, 1)); + logical_dns_cluster_resolver_response_generator_->SetResponse( + std::move(result)); + } + // Wait for traffic to go to backend 0. + WaitForBackend(0); + // Shutdown backend 0 and wait for all traffic to go to backend 1. + ShutdownBackend(0); + WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true)); + auto response_state = balancer_->ads_service()->cds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Bring backend 0 back and ensure all traffic go back to it. + StartBackend(0); + WaitForBackend(0); +} + +// This test covers a bug seen in the wild where the +// xds_cluster_resolver policy's code to reuse child policy names did +// not correctly handle the case where the LOGICAL_DNS priority failed, +// thus returning a priority with no localities. This caused the child +// name to be reused incorrectly, which triggered an assertion failure +// in the xds_cluster_impl policy caused by changing its cluster name. +TEST_P(AggregateClusterTest, ReconfigEdsWhileLogicalDnsChildFails) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + CreateAndStartBackends(2); + const char* kNewCluster1Name = "new_cluster_1"; + const char* kNewEdsService1Name = "new_eds_service_name_1"; + const char* kLogicalDNSClusterName = "logical_dns_cluster"; + // Populate EDS resource with all unreachable endpoints. + // - Priority 0: locality0 + // - Priority 1: locality1, locality2 + EdsResourceArgs args1({ + {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0}, + {"locality1", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1}, + {"locality2", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1}, + }); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args1, kNewEdsService1Name)); + // Populate new CDS resources. + Cluster new_cluster1 = default_cluster_; + new_cluster1.set_name(kNewCluster1Name); + new_cluster1.mutable_eds_cluster_config()->set_service_name( + kNewEdsService1Name); + balancer_->ads_service()->SetCdsResource(new_cluster1); + // Create Logical DNS Cluster + auto logical_dns_cluster = default_cluster_; + logical_dns_cluster.set_name(kLogicalDNSClusterName); + logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); + auto* address = logical_dns_cluster.mutable_load_assignment() + ->add_endpoints() + ->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + address->set_address(kServerName); + address->set_port_value(443); + balancer_->ads_service()->SetCdsResource(logical_dns_cluster); + // Create Aggregate Cluster + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kNewCluster1Name); + cluster_config.add_clusters(kLogicalDNSClusterName); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + // Set Logical DNS result + { + grpc_core::ExecCtx exec_ctx; + grpc_core::Resolver::Result result; + result.addresses = absl::UnavailableError("injected error"); + logical_dns_cluster_resolver_response_generator_->SetResponse( + std::move(result)); + } + // When an RPC fails, we know the channel has seen the update. + CheckRpcSendFailure(); + // Send an EDS update that moves locality1 to priority 0. + args1 = EdsResourceArgs({ + {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight, + 0}, + {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight, + 1}, + }); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args1, kNewEdsService1Name)); + WaitForBackend(0, WaitForBackendOptions().set_allow_failures(true)); +} + +TEST_P(AggregateClusterTest, MultipleClustersWithSameLocalities) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + CreateAndStartBackends(2); + const char* kNewClusterName1 = "new_cluster_1"; + const char* kNewEdsServiceName1 = "new_eds_service_name_1"; + const char* kNewClusterName2 = "new_cluster_2"; + const char* kNewEdsServiceName2 = "new_eds_service_name_2"; + // Populate EDS resource for cluster 1 with unreachable endpoint. + EdsResourceArgs args1({{"locality0", {MakeNonExistantEndpoint()}}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args1, kNewEdsServiceName1)); + // Populate CDS resource for cluster 1. + Cluster new_cluster1 = default_cluster_; + new_cluster1.set_name(kNewClusterName1); + new_cluster1.mutable_eds_cluster_config()->set_service_name( + kNewEdsServiceName1); + balancer_->ads_service()->SetCdsResource(new_cluster1); + // Populate EDS resource for cluster 2. + args1 = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args1, kNewEdsServiceName2)); + // Populate CDS resource for cluster 2. + Cluster new_cluster2 = default_cluster_; + new_cluster2.set_name(kNewClusterName2); + new_cluster2.mutable_eds_cluster_config()->set_service_name( + kNewEdsServiceName2); + balancer_->ads_service()->SetCdsResource(new_cluster2); + // Create Aggregate Cluster + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kNewClusterName1); + cluster_config.add_clusters(kNewClusterName2); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + // Wait for channel to get the resources and get connected. + WaitForBackend(0); + // Send an EDS update for cluster 1 that reuses the locality name from + // cluster 1 and points traffic to backend 1. + args1 = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args1, kNewEdsServiceName1)); + WaitForBackend(1); +} + +TEST_P(AggregateClusterTest, RecursionDepthJustBelowMax) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Populate EDS resource. + CreateAndStartBackends(1); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Populate new CDS resource. + Cluster new_cluster = default_cluster_; + new_cluster.set_name(absl::StrCat(kDefaultClusterName, 15)); + balancer_->ads_service()->SetCdsResource(new_cluster); + // Populate aggregate cluster chain. + for (int i = 14; i >= 0; --i) { + auto cluster = default_cluster_; + if (i > 0) cluster.set_name(absl::StrCat(kDefaultClusterName, i)); + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(absl::StrCat(kDefaultClusterName, i + 1)); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + } + // RPCs should fail with the right status. + CheckRpcSendOk(); +} + +TEST_P(AggregateClusterTest, RecursionMaxDepth) { + ScopedExperimentalEnvVar env_var( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + // Populate EDS resource. + CreateAndStartBackends(1); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Populate new CDS resource. + Cluster new_cluster = default_cluster_; + new_cluster.set_name(absl::StrCat(kDefaultClusterName, 16)); + balancer_->ads_service()->SetCdsResource(new_cluster); + // Populate aggregate cluster chain. + for (int i = 15; i >= 0; --i) { + auto cluster = default_cluster_; + if (i > 0) cluster.set_name(absl::StrCat(kDefaultClusterName, i)); + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(absl::StrCat(kDefaultClusterName, i + 1)); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancer_->ads_service()->SetCdsResource(cluster); + } + // RPCs should fail with the right status. + const Status status = SendRpc(); + EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); + EXPECT_THAT( + status.error_message(), + ::testing::HasSubstr("aggregate cluster graph exceeds max depth")); +} + +// Test that CDS client should send a NACK if cluster type is AGGREGATE but +// the feature is not yet supported. +TEST_P(AggregateClusterTest, Disabled) { + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters("cluster1"); + cluster_config.add_clusters("cluster2"); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + cluster.set_type(Cluster::LOGICAL_DNS); + balancer_->ads_service()->SetCdsResource(cluster); + const auto response_state = WaitForCdsNack(); + ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; + EXPECT_THAT(response_state->error_message, + ::testing::HasSubstr("DiscoveryType is not valid.")); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(&argc, argv); + ::testing::InitGoogleTest(&argc, argv); + // Make the backup poller poll very frequently in order to pick up + // updates from all the subchannels's FDs. + GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1); +#if TARGET_OS_IPHONE + // Workaround Apple CFStream bug + gpr_setenv("grpc_cfstream", "0"); +#endif + grpc_init(); + grpc::testing::ConnectionAttemptInjector::Init(); + const auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/cpp/end2end/xds/xds_end2end_test.cc b/test/cpp/end2end/xds/xds_end2end_test.cc index 63b9ef1b152..c47d4f550ab 100644 --- a/test/cpp/end2end/xds/xds_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_end2end_test.cc @@ -105,7 +105,6 @@ #include "src/proto/grpc/testing/xds/v3/tls.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" -#include "test/cpp/end2end/connection_delay_injector.h" #include "test/cpp/end2end/xds/no_op_http_filter.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" #include "test/cpp/util/test_config.h" @@ -116,7 +115,6 @@ namespace testing { namespace { using ::envoy::config::cluster::v3::CircuitBreakers; -using ::envoy::config::cluster::v3::CustomClusterType; using ::envoy::config::cluster::v3::RoutingPriority; using ::envoy::config::endpoint::v3::HealthStatus; using ::envoy::config::listener::v3::FilterChainMatch; @@ -125,7 +123,6 @@ using ::envoy::config::rbac::v3::RBAC_Action; using ::envoy::config::rbac::v3::RBAC_Action_ALLOW; using ::envoy::config::rbac::v3::RBAC_Action_DENY; using ::envoy::config::rbac::v3::RBAC_Action_LOG; -using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig; using ::envoy::extensions::filters::http::rbac::v3::RBAC; using ::envoy::extensions::filters::http::rbac::v3::RBACPerRoute; using ::envoy::extensions::transport_sockets::tls::v3::DownstreamTlsContext; @@ -624,37 +621,7 @@ TEST_P(XdsResolverOnlyTest, KeepUsingLastDataIfBalancerGoesDown) { WaitForBackend(1); } -class CdsTest : public XdsEnd2endTest { - protected: - void SetUp() override { - logical_dns_cluster_resolver_response_generator_ = - grpc_core::MakeRefCounted(); - InitClient(); - ChannelArguments args; - args.SetPointerWithVtable( - GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR, - logical_dns_cluster_resolver_response_generator_.get(), - &grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable); - ResetStub(/*failover_timeout_ms=*/0, &args); - } - - grpc_core::ServerAddressList CreateAddressListFromPortList( - const std::vector& ports) { - grpc_core::ServerAddressList addresses; - for (int port : ports) { - absl::StatusOr lb_uri = grpc_core::URI::Parse( - absl::StrCat(ipv6_only_ ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port)); - GPR_ASSERT(lb_uri.ok()); - grpc_resolved_address address; - GPR_ASSERT(grpc_parse_uri(*lb_uri, &address)); - addresses.emplace_back(address.addr, address.len, nullptr); - } - return addresses; - } - - grpc_core::RefCountedPtr - logical_dns_cluster_resolver_response_generator_; -}; +using CdsTest = XdsEnd2endTest; // Tests that CDS client should send an ACK upon correct CDS response. TEST_P(CdsTest, Vanilla) { @@ -664,801 +631,6 @@ TEST_P(CdsTest, Vanilla) { EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); } -TEST_P(CdsTest, LogicalDNSClusterType) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - CreateAndStartBackends(1); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - auto* address = cluster.mutable_load_assignment() - ->add_endpoints() - ->add_lb_endpoints() - ->mutable_endpoint() - ->mutable_address() - ->mutable_socket_address(); - address->set_address(kServerName); - address->set_port_value(443); - balancer_->ads_service()->SetCdsResource(cluster); - // Set Logical DNS result - { - grpc_core::ExecCtx exec_ctx; - grpc_core::Resolver::Result result; - result.addresses = CreateAddressListFromPortList(GetBackendPorts()); - logical_dns_cluster_resolver_response_generator_->SetResponse( - std::move(result)); - } - // RPCs should succeed. - CheckRpcSendOk(); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeMissingLoadAssignment) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr( - "load_assignment not present for LOGICAL_DNS cluster")); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeMissingLocalities) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - cluster.mutable_load_assignment(); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT( - response_state->error_message, - ::testing::HasSubstr("load_assignment for LOGICAL_DNS cluster must have " - "exactly one locality, found 0")); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeMultipleLocalities) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - auto* load_assignment = cluster.mutable_load_assignment(); - load_assignment->add_endpoints(); - load_assignment->add_endpoints(); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT( - response_state->error_message, - ::testing::HasSubstr("load_assignment for LOGICAL_DNS cluster must have " - "exactly one locality, found 2")); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeMissingEndpoints) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - cluster.mutable_load_assignment()->add_endpoints(); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr( - "locality for LOGICAL_DNS cluster must have exactly one " - "endpoint, found 0")); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeMultipleEndpoints) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - auto* locality = cluster.mutable_load_assignment()->add_endpoints(); - locality->add_lb_endpoints(); - locality->add_lb_endpoints(); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr( - "locality for LOGICAL_DNS cluster must have exactly one " - "endpoint, found 2")); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeEmptyEndpoint) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - cluster.mutable_load_assignment()->add_endpoints()->add_lb_endpoints(); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr("LbEndpoint endpoint field not set")); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeEndpointMissingAddress) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - cluster.mutable_load_assignment() - ->add_endpoints() - ->add_lb_endpoints() - ->mutable_endpoint(); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr("Endpoint address field not set")); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeAddressMissingSocketAddress) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - cluster.mutable_load_assignment() - ->add_endpoints() - ->add_lb_endpoints() - ->mutable_endpoint() - ->mutable_address(); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr("Address socket_address field not set")); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeSocketAddressHasResolverName) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - cluster.mutable_load_assignment() - ->add_endpoints() - ->add_lb_endpoints() - ->mutable_endpoint() - ->mutable_address() - ->mutable_socket_address() - ->set_resolver_name("foo"); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr("LOGICAL_DNS clusters must NOT have a " - "custom resolver name set")); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeSocketAddressMissingAddress) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - cluster.mutable_load_assignment() - ->add_endpoints() - ->add_lb_endpoints() - ->mutable_endpoint() - ->mutable_address() - ->mutable_socket_address(); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr("SocketAddress address field not set")); -} - -TEST_P(CdsTest, LogicalDNSClusterTypeSocketAddressMissingPort) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Create Logical DNS Cluster - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - cluster.mutable_load_assignment() - ->add_endpoints() - ->add_lb_endpoints() - ->mutable_endpoint() - ->mutable_address() - ->mutable_socket_address() - ->set_address(kServerName); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr("SocketAddress port_value field not set")); -} - -TEST_P(CdsTest, AggregateClusterType) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - CreateAndStartBackends(2); - const char* kNewCluster1Name = "new_cluster_1"; - const char* kNewEdsService1Name = "new_eds_service_name_1"; - const char* kNewCluster2Name = "new_cluster_2"; - const char* kNewEdsService2Name = "new_eds_service_name_2"; - // Populate new EDS resources. - EdsResourceArgs args1({ - {"locality0", CreateEndpointsForBackends(0, 1)}, - }); - EdsResourceArgs args2({ - {"locality0", CreateEndpointsForBackends(1, 2)}, - }); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args1, kNewEdsService1Name)); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args2, kNewEdsService2Name)); - // Populate new CDS resources. - Cluster new_cluster1 = default_cluster_; - new_cluster1.set_name(kNewCluster1Name); - new_cluster1.mutable_eds_cluster_config()->set_service_name( - kNewEdsService1Name); - balancer_->ads_service()->SetCdsResource(new_cluster1); - Cluster new_cluster2 = default_cluster_; - new_cluster2.set_name(kNewCluster2Name); - new_cluster2.mutable_eds_cluster_config()->set_service_name( - kNewEdsService2Name); - balancer_->ads_service()->SetCdsResource(new_cluster2); - // Create Aggregate Cluster - auto cluster = default_cluster_; - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(kNewCluster1Name); - cluster_config.add_clusters(kNewCluster2Name); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - // Wait for traffic to go to backend 0. - WaitForBackend(0); - // Shutdown backend 0 and wait for all traffic to go to backend 1. - ShutdownBackend(0); - WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true)); - auto response_state = balancer_->ads_service()->cds_response_state(); - ASSERT_TRUE(response_state.has_value()); - EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); - // Bring backend 0 back and ensure all traffic go back to it. - StartBackend(0); - WaitForBackend(0); -} - -TEST_P(CdsTest, AggregateClusterDiamondDependency) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - const char* kNewClusterName1 = "new_cluster_1"; - const char* kNewEdsServiceName1 = "new_eds_service_name_1"; - const char* kNewClusterName2 = "new_cluster_2"; - const char* kNewEdsServiceName2 = "new_eds_service_name_2"; - const char* kNewAggregateClusterName = "new_aggregate_cluster"; - // Populate new EDS resources. - CreateAndStartBackends(2); - EdsResourceArgs args1({{"locality0", CreateEndpointsForBackends(0, 1)}}); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args1, kNewEdsServiceName1)); - EdsResourceArgs args2({{"locality0", CreateEndpointsForBackends(1, 2)}}); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args2, kNewEdsServiceName2)); - // Populate new CDS resources. - Cluster new_cluster1 = default_cluster_; - new_cluster1.set_name(kNewClusterName1); - new_cluster1.mutable_eds_cluster_config()->set_service_name( - kNewEdsServiceName1); - balancer_->ads_service()->SetCdsResource(new_cluster1); - Cluster new_cluster2 = default_cluster_; - new_cluster2.set_name(kNewClusterName2); - new_cluster2.mutable_eds_cluster_config()->set_service_name( - kNewEdsServiceName2); - balancer_->ads_service()->SetCdsResource(new_cluster2); - // Populate top-level aggregate cluster pointing to kNewClusterName1 - // and kNewAggregateClusterName. - auto cluster = default_cluster_; - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(kNewClusterName1); - cluster_config.add_clusters(kNewAggregateClusterName); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - // Populate kNewAggregateClusterName aggregate cluster pointing to - // kNewClusterName1 and kNewClusterName2. - auto aggregate_cluster2 = default_cluster_; - aggregate_cluster2.set_name(kNewAggregateClusterName); - custom_cluster = aggregate_cluster2.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - cluster_config.Clear(); - cluster_config.add_clusters(kNewClusterName1); - cluster_config.add_clusters(kNewClusterName2); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(aggregate_cluster2); - // Wait for traffic to go to backend 0. - WaitForBackend(0); - // Shutdown backend 0 and wait for all traffic to go to backend 1. - ShutdownBackend(0); - WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true)); - auto response_state = balancer_->ads_service()->cds_response_state(); - ASSERT_TRUE(response_state.has_value()); - EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); - // Bring backend 0 back and ensure all traffic go back to it. - StartBackend(0); - WaitForBackend(0); -} - -// This test covers a bug found in the following scenario: -// 1. P0 reports TRANSIENT_FAILURE, so we start connecting to P1. -// 2. While P1 is still in CONNECTING, P0 goes back to READY, so we -// switch back to P0, deactivating P1. -// 3. P0 then goes back to TRANSIENT_FAILURE, and we reactivate P1. -// The bug caused us to fail to choose P1 even though it is in state -// CONNECTING (because the failover timer was not running), so we -// incorrectly failed the RPCs. -TEST_P(CdsTest, AggregateClusterFallBackWithConnectivityChurn) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - CreateAndStartBackends(2); - const char* kClusterName1 = "cluster1"; - const char* kClusterName2 = "cluster2"; - const char* kEdsServiceName2 = "eds_service_name2"; - // Populate EDS resources. - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - args = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}}); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args, kEdsServiceName2)); - // Populate new CDS resources. - Cluster cluster1 = default_cluster_; - cluster1.set_name(kClusterName1); - balancer_->ads_service()->SetCdsResource(cluster1); - Cluster cluster2 = default_cluster_; - cluster2.set_name(kClusterName2); - cluster2.mutable_eds_cluster_config()->set_service_name(kEdsServiceName2); - balancer_->ads_service()->SetCdsResource(cluster2); - // Create Aggregate Cluster - auto cluster = default_cluster_; - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(kClusterName1); - cluster_config.add_clusters(kClusterName2); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - // This class injects itself into all TCP connection attempts made - // against iomgr. It intercepts the attempts for the P0 and P1 - // backends and allows them to proceed as desired to simulate the case - // being tested. - class ConnectionInjector : public ConnectionAttemptInjector { - public: - ConnectionInjector(int p0_port, int p1_port) - : p0_port_(p0_port), p1_port_(p1_port) {} - - void HandleConnection(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) override { - { - grpc_core::MutexLock lock(&mu_); - const int port = grpc_sockaddr_get_port(addr); - gpr_log(GPR_INFO, "==> HandleConnection(): state_=%d, port=%d", state_, - port); - switch (state_) { - case kInit: - // Make P0 report TF, which should trigger us to try to connect to - // P1. - if (port == p0_port_) { - gpr_log(GPR_INFO, "*** INJECTING FAILURE FOR P0 ENDPOINT"); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "injected connection failure")); - state_ = kP0Failed; - return; - } - break; - case kP0Failed: - // Hold connection attempt to P1 so that it stays in CONNECTING. - if (port == p1_port_) { - gpr_log(GPR_INFO, - "*** DELAYING CONNECTION ATTEMPT FOR P1 ENDPOINT"); - queued_p1_attempt_ = absl::make_unique( - closure, ep, interested_parties, channel_args, addr, - deadline); - state_ = kDone; - return; - } - break; - case kDone: - // P0 should attempt reconnection. Log it to make the test - // easier to debug, but allow it to complete, so that the - // priority policy deactivates P1. - if (port == p0_port_) { - gpr_log(GPR_INFO, - "*** INTERCEPTING CONNECTION ATTEMPT FOR P0 ENDPOINT"); - } - break; - } - } - AttemptConnection(closure, ep, interested_parties, channel_args, addr, - deadline); - } - - // Invoked by the test when the RPC to the P0 backend has succeeded - // and it's ready to allow the P1 connection attempt to proceed. - void CompletePriority1Connection() { - grpc_core::ExecCtx exec_ctx; - std::unique_ptr attempt; - { - grpc_core::MutexLock lock(&mu_); - GPR_ASSERT(state_ == kDone); - attempt = std::move(queued_p1_attempt_); - } - attempt->Resume(); - } - - private: - const int p0_port_; - const int p1_port_; - - grpc_core::Mutex mu_; - enum { - kInit, - kP0Failed, - kDone, - } state_ ABSL_GUARDED_BY(mu_) = kInit; - std::unique_ptr queued_p1_attempt_ ABSL_GUARDED_BY(mu_); - }; - ConnectionInjector connection_attempt_injector(backends_[0]->port(), - backends_[1]->port()); - // Wait for P0 backend. - // Increase timeout to account for subchannel connection delays. - WaitForBackend(0, WaitForBackendOptions(), RpcOptions().set_timeout_ms(2000)); - // Bring down the P0 backend. - ShutdownBackend(0); - // Allow the connection attempt to the P1 backend to resume. - connection_attempt_injector.CompletePriority1Connection(); - // Wait for P1 backend to start getting traffic. - WaitForBackend(1); -} - -TEST_P(CdsTest, AggregateClusterEdsToLogicalDns) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - CreateAndStartBackends(2); - const char* kNewCluster1Name = "new_cluster_1"; - const char* kNewEdsService1Name = "new_eds_service_name_1"; - const char* kLogicalDNSClusterName = "logical_dns_cluster"; - // Populate new EDS resources. - EdsResourceArgs args1({{"locality0", CreateEndpointsForBackends(0, 1)}}); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args1, kNewEdsService1Name)); - // Populate new CDS resources. - Cluster new_cluster1 = default_cluster_; - new_cluster1.set_name(kNewCluster1Name); - new_cluster1.mutable_eds_cluster_config()->set_service_name( - kNewEdsService1Name); - balancer_->ads_service()->SetCdsResource(new_cluster1); - // Create Logical DNS Cluster - auto logical_dns_cluster = default_cluster_; - logical_dns_cluster.set_name(kLogicalDNSClusterName); - logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); - auto* address = logical_dns_cluster.mutable_load_assignment() - ->add_endpoints() - ->add_lb_endpoints() - ->mutable_endpoint() - ->mutable_address() - ->mutable_socket_address(); - address->set_address(kServerName); - address->set_port_value(443); - balancer_->ads_service()->SetCdsResource(logical_dns_cluster); - // Create Aggregate Cluster - auto cluster = default_cluster_; - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(kNewCluster1Name); - cluster_config.add_clusters(kLogicalDNSClusterName); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - // Set Logical DNS result - { - grpc_core::ExecCtx exec_ctx; - grpc_core::Resolver::Result result; - result.addresses = CreateAddressListFromPortList(GetBackendPorts(1, 2)); - logical_dns_cluster_resolver_response_generator_->SetResponse( - std::move(result)); - } - // Wait for traffic to go to backend 0. - WaitForBackend(0); - // Shutdown backend 0 and wait for all traffic to go to backend 1. - ShutdownBackend(0); - WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true)); - auto response_state = balancer_->ads_service()->cds_response_state(); - ASSERT_TRUE(response_state.has_value()); - EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); - // Bring backend 0 back and ensure all traffic go back to it. - StartBackend(0); - WaitForBackend(0); -} - -TEST_P(CdsTest, AggregateClusterLogicalDnsToEds) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - CreateAndStartBackends(2); - const char* kNewCluster2Name = "new_cluster_2"; - const char* kNewEdsService2Name = "new_eds_service_name_2"; - const char* kLogicalDNSClusterName = "logical_dns_cluster"; - // Populate new EDS resources. - EdsResourceArgs args2({ - {"locality0", CreateEndpointsForBackends(1, 2)}, - }); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args2, kNewEdsService2Name)); - // Populate new CDS resources. - Cluster new_cluster2 = default_cluster_; - new_cluster2.set_name(kNewCluster2Name); - new_cluster2.mutable_eds_cluster_config()->set_service_name( - kNewEdsService2Name); - balancer_->ads_service()->SetCdsResource(new_cluster2); - // Create Logical DNS Cluster - auto logical_dns_cluster = default_cluster_; - logical_dns_cluster.set_name(kLogicalDNSClusterName); - logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); - auto* address = logical_dns_cluster.mutable_load_assignment() - ->add_endpoints() - ->add_lb_endpoints() - ->mutable_endpoint() - ->mutable_address() - ->mutable_socket_address(); - address->set_address(kServerName); - address->set_port_value(443); - balancer_->ads_service()->SetCdsResource(logical_dns_cluster); - // Create Aggregate Cluster - auto cluster = default_cluster_; - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(kLogicalDNSClusterName); - cluster_config.add_clusters(kNewCluster2Name); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - // Set Logical DNS result - { - grpc_core::ExecCtx exec_ctx; - grpc_core::Resolver::Result result; - result.addresses = CreateAddressListFromPortList(GetBackendPorts(0, 1)); - logical_dns_cluster_resolver_response_generator_->SetResponse( - std::move(result)); - } - // Wait for traffic to go to backend 0. - WaitForBackend(0); - // Shutdown backend 0 and wait for all traffic to go to backend 1. - ShutdownBackend(0); - WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true)); - auto response_state = balancer_->ads_service()->cds_response_state(); - ASSERT_TRUE(response_state.has_value()); - EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); - // Bring backend 0 back and ensure all traffic go back to it. - StartBackend(0); - WaitForBackend(0); -} - -// This test covers a bug seen in the wild where the -// xds_cluster_resolver policy's code to reuse child policy names did -// not correctly handle the case where the LOGICAL_DNS priority failed, -// thus returning a priority with no localities. This caused the child -// name to be reused incorrectly, which triggered an assertion failure -// in the xds_cluster_impl policy caused by changing its cluster name. -TEST_P(CdsTest, AggregateClusterReconfigEdsWhileLogicalDnsChildFails) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - CreateAndStartBackends(2); - const char* kNewCluster1Name = "new_cluster_1"; - const char* kNewEdsService1Name = "new_eds_service_name_1"; - const char* kLogicalDNSClusterName = "logical_dns_cluster"; - // Populate EDS resource with all unreachable endpoints. - // - Priority 0: locality0 - // - Priority 1: locality1, locality2 - EdsResourceArgs args1({ - {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0}, - {"locality1", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1}, - {"locality2", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1}, - }); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args1, kNewEdsService1Name)); - // Populate new CDS resources. - Cluster new_cluster1 = default_cluster_; - new_cluster1.set_name(kNewCluster1Name); - new_cluster1.mutable_eds_cluster_config()->set_service_name( - kNewEdsService1Name); - balancer_->ads_service()->SetCdsResource(new_cluster1); - // Create Logical DNS Cluster - auto logical_dns_cluster = default_cluster_; - logical_dns_cluster.set_name(kLogicalDNSClusterName); - logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); - auto* address = logical_dns_cluster.mutable_load_assignment() - ->add_endpoints() - ->add_lb_endpoints() - ->mutable_endpoint() - ->mutable_address() - ->mutable_socket_address(); - address->set_address(kServerName); - address->set_port_value(443); - balancer_->ads_service()->SetCdsResource(logical_dns_cluster); - // Create Aggregate Cluster - auto cluster = default_cluster_; - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(kNewCluster1Name); - cluster_config.add_clusters(kLogicalDNSClusterName); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - // Set Logical DNS result - { - grpc_core::ExecCtx exec_ctx; - grpc_core::Resolver::Result result; - result.addresses = absl::UnavailableError("injected error"); - logical_dns_cluster_resolver_response_generator_->SetResponse( - std::move(result)); - } - // When an RPC fails, we know the channel has seen the update. - CheckRpcSendFailure(); - // Send an EDS update that moves locality1 to priority 0. - args1 = EdsResourceArgs({ - {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight, - 0}, - {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight, - 1}, - }); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args1, kNewEdsService1Name)); - WaitForBackend(0, WaitForBackendOptions().set_allow_failures(true)); -} - -TEST_P(CdsTest, AggregateClusterMultipleClustersWithSameLocalities) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - CreateAndStartBackends(2); - const char* kNewClusterName1 = "new_cluster_1"; - const char* kNewEdsServiceName1 = "new_eds_service_name_1"; - const char* kNewClusterName2 = "new_cluster_2"; - const char* kNewEdsServiceName2 = "new_eds_service_name_2"; - // Populate EDS resource for cluster 1 with unreachable endpoint. - EdsResourceArgs args1({{"locality0", {MakeNonExistantEndpoint()}}}); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args1, kNewEdsServiceName1)); - // Populate CDS resource for cluster 1. - Cluster new_cluster1 = default_cluster_; - new_cluster1.set_name(kNewClusterName1); - new_cluster1.mutable_eds_cluster_config()->set_service_name( - kNewEdsServiceName1); - balancer_->ads_service()->SetCdsResource(new_cluster1); - // Populate EDS resource for cluster 2. - args1 = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(0, 1)}}); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args1, kNewEdsServiceName2)); - // Populate CDS resource for cluster 2. - Cluster new_cluster2 = default_cluster_; - new_cluster2.set_name(kNewClusterName2); - new_cluster2.mutable_eds_cluster_config()->set_service_name( - kNewEdsServiceName2); - balancer_->ads_service()->SetCdsResource(new_cluster2); - // Create Aggregate Cluster - auto cluster = default_cluster_; - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(kNewClusterName1); - cluster_config.add_clusters(kNewClusterName2); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - // Wait for channel to get the resources and get connected. - WaitForBackend(0); - // Send an EDS update for cluster 1 that reuses the locality name from - // cluster 1 and points traffic to backend 1. - args1 = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}}); - balancer_->ads_service()->SetEdsResource( - BuildEdsResource(args1, kNewEdsServiceName1)); - WaitForBackend(1); -} - -TEST_P(CdsTest, AggregateClusterRecursionDepthJustBelowMax) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Populate EDS resource. - CreateAndStartBackends(1); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // Populate new CDS resource. - Cluster new_cluster = default_cluster_; - new_cluster.set_name(absl::StrCat(kDefaultClusterName, 15)); - balancer_->ads_service()->SetCdsResource(new_cluster); - // Populate aggregate cluster chain. - for (int i = 14; i >= 0; --i) { - auto cluster = default_cluster_; - if (i > 0) cluster.set_name(absl::StrCat(kDefaultClusterName, i)); - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(absl::StrCat(kDefaultClusterName, i + 1)); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - } - // RPCs should fail with the right status. - CheckRpcSendOk(); -} - -TEST_P(CdsTest, AggregateClusterRecursionMaxDepth) { - ScopedExperimentalEnvVar env_var( - "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); - // Populate EDS resource. - CreateAndStartBackends(1); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // Populate new CDS resource. - Cluster new_cluster = default_cluster_; - new_cluster.set_name(absl::StrCat(kDefaultClusterName, 16)); - balancer_->ads_service()->SetCdsResource(new_cluster); - // Populate aggregate cluster chain. - for (int i = 15; i >= 0; --i) { - auto cluster = default_cluster_; - if (i > 0) cluster.set_name(absl::StrCat(kDefaultClusterName, i)); - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters(absl::StrCat(kDefaultClusterName, i + 1)); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - balancer_->ads_service()->SetCdsResource(cluster); - } - // RPCs should fail with the right status. - const Status status = SendRpc(); - EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); - EXPECT_THAT( - status.error_message(), - ::testing::HasSubstr("aggregate cluster graph exceeds max depth")); -} - -// Test that CDS client should send a NACK if cluster type is Logical DNS but -// the feature is not yet supported. -TEST_P(CdsTest, LogicalDNSClusterTypeDisabled) { - auto cluster = default_cluster_; - cluster.set_type(Cluster::LOGICAL_DNS); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr("DiscoveryType is not valid.")); -} - -// Test that CDS client should send a NACK if cluster type is AGGREGATE but -// the feature is not yet supported. -TEST_P(CdsTest, AggregateClusterTypeDisabled) { - auto cluster = default_cluster_; - CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); - custom_cluster->set_name("envoy.clusters.aggregate"); - ClusterConfig cluster_config; - cluster_config.add_clusters("cluster1"); - cluster_config.add_clusters("cluster2"); - custom_cluster->mutable_typed_config()->PackFrom(cluster_config); - cluster.set_type(Cluster::LOGICAL_DNS); - balancer_->ads_service()->SetCdsResource(cluster); - const auto response_state = WaitForCdsNack(); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::HasSubstr("DiscoveryType is not valid.")); -} - // Tests that CDS client should send a NACK if the cluster type in CDS // response is unsupported. TEST_P(CdsTest, UnsupportedClusterType) { @@ -6359,7 +5531,6 @@ int main(int argc, char** argv) { absl::make_unique( "fake2", grpc::testing::g_fake2_cert_data_map)); grpc_init(); - grpc::testing::ConnectionAttemptInjector::Init(); grpc_core::XdsHttpFilterRegistry::RegisterFilter( absl::make_unique( "grpc.testing.client_only_http_filter",