From 19b1f8f64ac723bc46d70fbf4638c3f7e335b5f5 Mon Sep 17 00:00:00 2001 From: Donna Dionne Date: Thu, 21 Jan 2021 14:25:50 -0800 Subject: [PATCH] Cds Watchers changes to support aggregate Cds and Logical DNS Cds. --- BUILD | 6 + BUILD.gn | 4 + CMakeLists.txt | 9 + Makefile | 4 + build_autogenerated.yaml | 5 + config.m4 | 4 + config.w32 | 8 + gRPC-C++.podspec | 4 + gRPC-Core.podspec | 6 + grpc.gemspec | 4 + grpc.gyp | 2 + package.xml | 4 + .../client_channel/lb_policy/xds/cds.cc | 373 ++++++++++++------ .../lb_policy/xds/xds_channel_args.h | 5 + .../lb_policy/xds/xds_cluster_resolver.cc | 49 ++- .../clusters/aggregate/v3/cluster.upb.c | 29 ++ .../clusters/aggregate/v3/cluster.upb.h | 67 ++++ .../clusters/aggregate/v3/cluster.upbdefs.c | 51 +++ .../clusters/aggregate/v3/cluster.upbdefs.h | 35 ++ src/core/ext/xds/xds_api.cc | 102 ++++- src/core/ext/xds/xds_api.h | 10 +- src/proto/grpc/testing/xds/cds_for_test.proto | 12 + src/proto/grpc/testing/xds/v3/BUILD | 11 + .../testing/xds/v3/aggregate_cluster.proto | 30 ++ src/proto/grpc/testing/xds/v3/cluster.proto | 14 + src/python/grpcio/grpc_core_dependencies.py | 2 + test/cpp/end2end/BUILD | 1 + test/cpp/end2end/xds_end2end_test.cc | 204 +++++++++- tools/codegen/core/gen_upb_api.sh | 1 + tools/doxygen/Doxyfile.c++.internal | 4 + tools/doxygen/Doxyfile.core.internal | 4 + 31 files changed, 904 insertions(+), 160 deletions(-) create mode 100644 src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c create mode 100644 src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h create mode 100644 src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c create mode 100644 src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h create mode 100644 src/proto/grpc/testing/xds/v3/aggregate_cluster.proto diff --git a/BUILD b/BUILD index f20a2e6d177..fea26e8b7f1 100644 --- a/BUILD +++ b/BUILD @@ -1479,7 +1479,9 @@ grpc_cc_library( "grpc_base", "grpc_client_channel", "grpc_lb_address_filtering", + "grpc_lb_xds_channel_args", "grpc_lb_xds_common", + "grpc_resolver_fake", "grpc_xds_client", ], ) @@ -2628,6 +2630,7 @@ grpc_cc_library( "src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c", "src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c", "src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c", + "src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c", "src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c", "src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c", "src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c", @@ -2660,6 +2663,7 @@ grpc_cc_library( "src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.h", "src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h", "src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h", + "src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h", "src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h", "src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.h", "src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.h", @@ -2709,6 +2713,7 @@ grpc_cc_library( "src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c", "src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c", "src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c", + "src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c", "src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c", "src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c", "src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c", @@ -2740,6 +2745,7 @@ grpc_cc_library( "src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.h", "src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h", "src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h", + "src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h", "src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h", "src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.h", "src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.h", diff --git a/BUILD.gn b/BUILD.gn index 472d2cac4c4..9d74bc96ebd 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -457,6 +457,8 @@ config("grpc_config") { "src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h", "src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c", "src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h", + "src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c", + "src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h", "src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c", "src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h", "src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c", @@ -627,6 +629,8 @@ config("grpc_config") { "src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h", "src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c", "src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h", + "src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c", + "src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h", "src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c", "src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h", "src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c", diff --git a/CMakeLists.txt b/CMakeLists.txt index 2ec1f050aaa..3bc286a1e3f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -457,6 +457,9 @@ protobuf_generate_grpc_cpp( protobuf_generate_grpc_cpp( src/proto/grpc/testing/xds/v3/ads.proto ) +protobuf_generate_grpc_cpp( + src/proto/grpc/testing/xds/v3/aggregate_cluster.proto +) protobuf_generate_grpc_cpp( src/proto/grpc/testing/xds/v3/base.proto ) @@ -1582,6 +1585,7 @@ add_library(grpc src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c + src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c @@ -1667,6 +1671,7 @@ add_library(grpc src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c + src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c @@ -15616,6 +15621,10 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h diff --git a/Makefile b/Makefile index d895d2a6256..b502df37498 100644 --- a/Makefile +++ b/Makefile @@ -1169,6 +1169,7 @@ LIBGRPC_SRC = \ src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c \ src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c \ src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c \ + src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c \ src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c \ src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c \ src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c \ @@ -1254,6 +1255,7 @@ LIBGRPC_SRC = \ src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c \ + src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c \ @@ -2703,6 +2705,7 @@ src/core/ext/upb-generated/envoy/config/route/v3/route.upb.c: $(OPENSSL_DEP) src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c: $(OPENSSL_DEP) src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c: $(OPENSSL_DEP) src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c: $(OPENSSL_DEP) +src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c: $(OPENSSL_DEP) src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c: $(OPENSSL_DEP) src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c: $(OPENSSL_DEP) src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c: $(OPENSSL_DEP) @@ -2773,6 +2776,7 @@ src/core/ext/upbdefs-generated/envoy/config/route/v3/route.upbdefs.c: $(OPENSSL_ src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c: $(OPENSSL_DEP) src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c: $(OPENSSL_DEP) src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c: $(OPENSSL_DEP) +src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c: $(OPENSSL_DEP) src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c: $(OPENSSL_DEP) src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c: $(OPENSSL_DEP) src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c: $(OPENSSL_DEP) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 52bd7c0731a..30dfb8ba93f 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -497,6 +497,7 @@ libs: - src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.h - src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h - src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h + - src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h - src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h - src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.h - src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.h @@ -582,6 +583,7 @@ libs: - src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.h - src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h - src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h + - src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h - src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h - src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.h - src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.h @@ -1000,6 +1002,7 @@ libs: - src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c - src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c - src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c + - src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c - src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c - src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c - src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c @@ -1085,6 +1088,7 @@ libs: - src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c - src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c - src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c + - src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c - src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c - src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c - src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c @@ -7984,6 +7988,7 @@ targets: - src/proto/grpc/testing/xds/lrs_for_test.proto - src/proto/grpc/testing/xds/v3/address.proto - src/proto/grpc/testing/xds/v3/ads.proto + - src/proto/grpc/testing/xds/v3/aggregate_cluster.proto - src/proto/grpc/testing/xds/v3/base.proto - src/proto/grpc/testing/xds/v3/cluster.proto - src/proto/grpc/testing/xds/v3/config_source.proto diff --git a/config.m4 b/config.m4 index 601d4dc929b..62b31b58b23 100644 --- a/config.m4 +++ b/config.m4 @@ -175,6 +175,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c \ src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c \ src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c \ + src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c \ src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c \ src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c \ src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c \ @@ -261,6 +262,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c \ + src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c \ @@ -1046,6 +1048,7 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/config/rbac/v3) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/config/route/v3) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/config/trace/v3) + PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/service/cluster/v3) @@ -1077,6 +1080,7 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/config/listener/v3) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/config/route/v3) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/config/trace/v3) + PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/service/cluster/v3) diff --git a/config.w32 b/config.w32 index 0497b183453..43d2eb8cd5a 100644 --- a/config.w32 +++ b/config.w32 @@ -142,6 +142,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\upb-generated\\envoy\\config\\route\\v3\\route_components.upb.c " + "src\\core\\ext\\upb-generated\\envoy\\config\\route\\v3\\scoped_route.upb.c " + "src\\core\\ext\\upb-generated\\envoy\\config\\trace\\v3\\http_tracer.upb.c " + + "src\\core\\ext\\upb-generated\\envoy\\extensions\\clusters\\aggregate\\v3\\cluster.upb.c " + "src\\core\\ext\\upb-generated\\envoy\\extensions\\filters\\network\\http_connection_manager\\v3\\http_connection_manager.upb.c " + "src\\core\\ext\\upb-generated\\envoy\\extensions\\transport_sockets\\tls\\v3\\cert.upb.c " + "src\\core\\ext\\upb-generated\\envoy\\extensions\\transport_sockets\\tls\\v3\\common.upb.c " + @@ -228,6 +229,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\upbdefs-generated\\envoy\\config\\route\\v3\\route_components.upbdefs.c " + "src\\core\\ext\\upbdefs-generated\\envoy\\config\\route\\v3\\scoped_route.upbdefs.c " + "src\\core\\ext\\upbdefs-generated\\envoy\\config\\trace\\v3\\http_tracer.upbdefs.c " + + "src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\clusters\\aggregate\\v3\\cluster.upbdefs.c " + "src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\filters\\network\\http_connection_manager\\v3\\http_connection_manager.upbdefs.c " + "src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\transport_sockets\\tls\\v3\\cert.upbdefs.c " + "src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\transport_sockets\\tls\\v3\\common.upbdefs.c " + @@ -1061,6 +1063,9 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\config\\trace"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\config\\trace\\v3"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\clusters"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\clusters\\aggregate"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\clusters\\aggregate\\v3"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\filters"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\filters\\network"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\filters\\network\\http_connection_manager"); @@ -1130,6 +1135,9 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\config\\trace"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\config\\trace\\v3"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\clusters"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\clusters\\aggregate"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\clusters\\aggregate\\v3"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\filters"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\filters\\network"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\filters\\network\\http_connection_manager"); diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 00986eae727..13dd35ed019 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -314,6 +314,7 @@ Pod::Spec.new do |s| 'src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.h', 'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h', 'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h', + 'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h', 'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.h', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.h', @@ -400,6 +401,7 @@ Pod::Spec.new do |s| 'src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h', + 'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.h', @@ -932,6 +934,7 @@ Pod::Spec.new do |s| 'src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.h', 'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h', 'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h', + 'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h', 'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.h', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.h', @@ -1018,6 +1021,7 @@ Pod::Spec.new do |s| 'src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h', + 'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 00736e45f0a..ae4ef7c1c14 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -439,6 +439,8 @@ Pod::Spec.new do |s| 'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h', 'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c', 'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h', + 'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c', + 'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h', 'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c', 'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c', @@ -611,6 +613,8 @@ Pod::Spec.new do |s| 'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h', + 'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c', + 'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c', @@ -1464,6 +1468,7 @@ Pod::Spec.new do |s| 'src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.h', 'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h', 'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h', + 'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h', 'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.h', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.h', @@ -1550,6 +1555,7 @@ Pod::Spec.new do |s| 'src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h', + 'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.h', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.h', diff --git a/grpc.gemspec b/grpc.gemspec index ea6ea4c2bd7..733ad0886da 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -354,6 +354,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h ) s.files += %w( src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c ) s.files += %w( src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h ) + s.files += %w( src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c ) + s.files += %w( src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h ) s.files += %w( src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c ) s.files += %w( src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h ) s.files += %w( src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c ) @@ -526,6 +528,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h ) s.files += %w( src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c ) s.files += %w( src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h ) + s.files += %w( src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c ) + s.files += %w( src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h ) s.files += %w( src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c ) s.files += %w( src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h ) s.files += %w( src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c ) diff --git a/grpc.gyp b/grpc.gyp index 3cdbca6e8a0..e4d1c9f20d2 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -588,6 +588,7 @@ 'src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c', 'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c', 'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c', + 'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c', 'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c', @@ -673,6 +674,7 @@ 'src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c', + 'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c', diff --git a/package.xml b/package.xml index 0dda7e37f4b..7b9def2a615 100644 --- a/package.xml +++ b/package.xml @@ -334,6 +334,8 @@ + + @@ -506,6 +508,8 @@ + + diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 538279f5971..f246ae2ebf4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -68,21 +68,25 @@ class CdsLb : public LoadBalancingPolicy { // Watcher for getting cluster data from XdsClient. class ClusterWatcher : public XdsClient::ClusterWatcherInterface { public: - explicit ClusterWatcher(RefCountedPtr parent) - : parent_(std::move(parent)) {} + ClusterWatcher(RefCountedPtr parent, std::string name) + : parent_(std::move(parent)), name_(std::move(name)) {} void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override { - new Notifier(parent_, std::move(cluster_data)); + new Notifier(parent_, name_, std::move(cluster_data)); } - void OnError(grpc_error* error) override { new Notifier(parent_, error); } - void OnResourceDoesNotExist() override { new Notifier(parent_); } + void OnError(grpc_error* error) override { + new Notifier(parent_, name_, error); + } + void OnResourceDoesNotExist() override { new Notifier(parent_, name_); } private: class Notifier { public: - Notifier(RefCountedPtr parent, XdsApi::CdsUpdate update); - Notifier(RefCountedPtr parent, grpc_error* error); - explicit Notifier(RefCountedPtr parent); + Notifier(RefCountedPtr parent, std::string name, + XdsApi::CdsUpdate update); + Notifier(RefCountedPtr parent, std::string name, + grpc_error* error); + explicit Notifier(RefCountedPtr parent, std::string name); private: enum Type { kUpdate, kError, kDoesNotExist }; @@ -91,12 +95,22 @@ class CdsLb : public LoadBalancingPolicy { void RunInWorkSerializer(grpc_error* error); RefCountedPtr parent_; + std::string name_; grpc_closure closure_; XdsApi::CdsUpdate update_; Type type_; }; RefCountedPtr parent_; + std::string name_; + }; + + struct WatcherState { + // Pointer to watcher, to be used when cancelling. + // Not owned, so do not dereference. + ClusterWatcher* watcher = nullptr; + // Most recent update obtained from this watcher. + absl::optional update; }; // Delegating helper to be passed to child policy. @@ -119,12 +133,20 @@ class CdsLb : public LoadBalancingPolicy { void ShutdownLocked() override; - void OnClusterChanged(XdsApi::CdsUpdate cluster_data); - void OnError(grpc_error* error); - void OnResourceDoesNotExist(); + bool GenerateDiscoveryMechanismForCluster( + const std::string& name, Json::Array* discovery_mechanisms, + std::set* clusters_needed); + void OnClusterChanged(const std::string& name, + XdsApi::CdsUpdate cluster_data); + void OnError(const std::string& name, grpc_error* error); + void OnResourceDoesNotExist(const std::string& name); grpc_error* UpdateXdsCertificateProvider( - const XdsApi::CdsUpdate& cluster_data); + const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data); + + void CancelClusterDataWatch(absl::string_view cluster_name, + XdsClient::ClusterWatcherInterface* watcher, + bool delay_unsubscription = false); void MaybeDestroyChildPolicyLocked(); @@ -135,9 +157,10 @@ class CdsLb : public LoadBalancingPolicy { // The xds client. RefCountedPtr xds_client_; - // A pointer to the cluster watcher, to be used when cancelling the watch. - // Note that this is not owned, so this pointer must never be derefernced. - ClusterWatcher* cluster_watcher_ = nullptr; + + // Maps from cluster name to the state for that cluster. + // The root of the tree is config_->cluster(). + std::map watchers_; RefCountedPtr root_certificate_provider_; RefCountedPtr identity_certificate_provider_; @@ -155,21 +178,26 @@ class CdsLb : public LoadBalancingPolicy { // CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr parent, + std::string name, XdsApi::CdsUpdate update) - : parent_(std::move(parent)), update_(std::move(update)), type_(kUpdate) { + : parent_(std::move(parent)), + name_(std::move(name)), + update_(std::move(update)), + type_(kUpdate) { GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr parent, - grpc_error* error) - : parent_(std::move(parent)), type_(kError) { + std::string name, grpc_error* error) + : parent_(std::move(parent)), name_(std::move(name)), type_(kError) { GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, error); } -CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr parent) - : parent_(std::move(parent)), type_(kDoesNotExist) { +CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr parent, + std::string name) + : parent_(std::move(parent)), name_(std::move(name)), type_(kDoesNotExist) { GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } @@ -185,13 +213,13 @@ void CdsLb::ClusterWatcher::Notifier::RunInExecCtx(void* arg, void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer(grpc_error* error) { switch (type_) { case kUpdate: - parent_->OnClusterChanged(std::move(update_)); + parent_->OnClusterChanged(name_, std::move(update_)); break; case kError: - parent_->OnError(error); + parent_->OnError(name_, error); break; case kDoesNotExist: - parent_->OnResourceDoesNotExist(); + parent_->OnResourceDoesNotExist(name_); break; }; delete this; @@ -261,13 +289,15 @@ void CdsLb::ShutdownLocked() { shutting_down_ = true; MaybeDestroyChildPolicyLocked(); if (xds_client_ != nullptr) { - if (cluster_watcher_ != nullptr) { + for (auto& watcher : watchers_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, - config_->cluster().c_str()); + watcher.first.c_str()); } - xds_client_->CancelClusterDataWatch(config_->cluster(), cluster_watcher_); + CancelClusterDataWatch(watcher.first, watcher.second.watcher, + /*delay_unsubscription=*/false); } + watchers_.clear(); xds_client_.reset(DEBUG_LOCATION, "CdsLb"); } grpc_channel_args_destroy(args_); @@ -301,119 +331,203 @@ void CdsLb::UpdateLocked(UpdateArgs args) { // If cluster name changed, cancel watcher and restart. if (old_config == nullptr || old_config->cluster() != config_->cluster()) { if (old_config != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, - old_config->cluster().c_str()); + for (auto& watcher : watchers_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, + watcher.first.c_str()); + } + CancelClusterDataWatch(watcher.first, watcher.second.watcher, + /*delay_unsubscription=*/true); } - xds_client_->CancelClusterDataWatch(old_config->cluster(), - cluster_watcher_, - /*delay_unsubscription=*/true); - } - if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this, - config_->cluster().c_str()); + watchers_.clear(); } - auto watcher = absl::make_unique(Ref()); - cluster_watcher_ = watcher.get(); + auto watcher = absl::make_unique(Ref(), config_->cluster()); + watchers_[config_->cluster()].watcher = watcher.get(); xds_client_->WatchClusterData(config_->cluster(), std::move(watcher)); } } -void CdsLb::OnClusterChanged(XdsApi::CdsUpdate cluster_data) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, "[cdslb %p] received CDS update from xds client %p: %s", - this, xds_client_.get(), cluster_data.ToString().c_str()); +// This method will attempt to generate one or multiple entries of discovery +// mechanism recursively: +// For cluster types EDS or LOGICAL_DNS, one discovery mechanism entry may be +// generated cluster name, type and other data from the CdsUpdate inserted into +// the entry and the entry appended to the array of entries. +// Note, discovery mechanism entry can be generated if an CdsUpdate is +// available; otherwise, just return false. For cluster type AGGREGATE, +// recursively call the method for each child cluster. +bool CdsLb::GenerateDiscoveryMechanismForCluster( + const std::string& name, Json::Array* discovery_mechanisms, + std::set* clusters_needed) { + clusters_needed->insert(name); + auto& state = watchers_[name]; + // Create a new watcher if needed. + if (state.watcher == nullptr) { + auto watcher = absl::make_unique(Ref(), name); + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this, + name.c_str()); + } + state.watcher = watcher.get(); + xds_client_->WatchClusterData(name, std::move(watcher)); + return false; } - grpc_error* error = GRPC_ERROR_NONE; - error = UpdateXdsCertificateProvider(cluster_data); - if (error != GRPC_ERROR_NONE) { - return OnError(error); + // Don't have the update we need yet. + if (!state.update.has_value()) return false; + // For AGGREGATE clusters, recursively expand to child clusters. + if (state.update->cluster_type == XdsApi::CdsUpdate::ClusterType::AGGREGATE) { + bool missing_cluster = false; + for (const std::string& child_name : + state.update->prioritized_cluster_names) { + if (!GenerateDiscoveryMechanismForCluster( + child_name, discovery_mechanisms, clusters_needed)) { + missing_cluster = true; + } + } + return !missing_cluster; } - // Construct config for child policy. - Json::Object discovery_mechanism = { - {"clusterName", config_->cluster()}, - {"max_concurrent_requests", cluster_data.max_concurrent_requests}, - {"type", "EDS"}, + std::string type; + switch (state.update->cluster_type) { + case XdsApi::CdsUpdate::ClusterType::EDS: + type = "EDS"; + break; + case XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS: + type = "LOGICAL_DNS"; + break; + default: + GPR_ASSERT(0); + break; + } + Json::Object mechanism = { + {"clusterName", name}, + {"max_concurrent_requests", state.update->max_concurrent_requests}, + {"type", std::move(type)}, }; - if (!cluster_data.eds_service_name.empty()) { - discovery_mechanism["edsServiceName"] = cluster_data.eds_service_name; + if (!state.update->eds_service_name.empty()) { + mechanism["edsServiceName"] = state.update->eds_service_name; } - if (cluster_data.lrs_load_reporting_server_name.has_value()) { - discovery_mechanism["lrsLoadReportingServerName"] = - cluster_data.lrs_load_reporting_server_name.value(); + if (state.update->lrs_load_reporting_server_name.has_value()) { + mechanism["lrsLoadReportingServerName"] = + state.update->lrs_load_reporting_server_name.value(); } - Json::Object child_config = { - {"discoveryMechanisms", - Json::Array{ - discovery_mechanism, - }}, - {"localityPickingPolicy", - Json::Array{ - Json::Object{ - {"weighted_target_experimental", - Json::Object{ - {"targets", Json::Object()}, - }}, - }, - }}, - {"endpointPickingPolicy", - Json::Array{ - Json::Object{ - {"round_robin", Json::Object()}, - }, - }}, - }; - Json json = Json::Array{ - Json::Object{ - {"xds_cluster_resolver_experimental", std::move(child_config)}, - }, - }; + discovery_mechanisms->emplace_back(std::move(mechanism)); + return true; +} + +void CdsLb::OnClusterChanged(const std::string& name, + XdsApi::CdsUpdate cluster_data) { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - std::string json_str = json.Dump(/*indent=*/1); - gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this, - json_str.c_str()); + gpr_log( + GPR_INFO, + "[cdslb %p] received CDS update for cluster %s from xds client %p: %s", + this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str()); } - RefCountedPtr config = - LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); + // Store the update in the map if we are still interested in watching this + // cluster (i.e., it is not cancelled already). + // If we've already deleted this entry, then this is an update notification + // that was scheduled before the deletion, so we can just ignore it. + auto it = watchers_.find(name); + if (it == watchers_.end()) return; + it->second.update = std::move(cluster_data); + // Take care of integration with new certificate code. + grpc_error* error = GRPC_ERROR_NONE; + error = UpdateXdsCertificateProvider(name, it->second.update.value()); if (error != GRPC_ERROR_NONE) { - OnError(error); - return; + return OnError(name, error); } - // Create child policy if not already present. - if (child_policy_ == nullptr) { - LoadBalancingPolicy::Args args; - args.work_serializer = work_serializer(); - args.args = args_; - args.channel_control_helper = absl::make_unique(Ref()); - child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - config->name(), std::move(args)); - if (child_policy_ == nullptr) { - OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "failed to create child policy")); + // Scan the map starting from the root cluster to generate the list of + // discovery mechanisms. If we don't have some of the data we need (i.e., we + // just started up and not all watchers have returned data yet), then don't + // update the child policy at all. + Json::Array discovery_mechanisms; + std::set clusters_needed; + if (GenerateDiscoveryMechanismForCluster( + config_->cluster(), &discovery_mechanisms, &clusters_needed)) { + // Construct config for child policy. + Json::Object child_config = { + {"localityPickingPolicy", + Json::Array{ + Json::Object{ + {"weighted_target_experimental", + Json::Object{ + {"targets", Json::Object()}, + }}, + }, + }}, + {"endpointPickingPolicy", + Json::Array{ + Json::Object{ + {"round_robin", Json::Object()}, + }, + }}, + {"discoveryMechanisms", std::move(discovery_mechanisms)}, + }; + Json json = Json::Array{ + Json::Object{ + {"xds_cluster_resolver_experimental", std::move(child_config)}, + }, + }; + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + std::string json_str = json.Dump(/*indent=*/1); + gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", + this, json_str.c_str()); + } + RefCountedPtr config = + LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); + if (error != GRPC_ERROR_NONE) { + OnError(name, error); return; } - grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), - interested_parties()); - if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this, - config->name(), child_policy_.get()); + // Create child policy if not already present. + if (child_policy_ == nullptr) { + LoadBalancingPolicy::Args args; + args.work_serializer = work_serializer(); + args.args = args_; + args.channel_control_helper = absl::make_unique(Ref()); + child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + config->name(), std::move(args)); + if (child_policy_ == nullptr) { + OnError(name, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "failed to create child policy")); + return; + } + grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), + interested_parties()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this, + config->name(), child_policy_.get()); + } + } + // Update child policy. + UpdateArgs args; + args.config = std::move(config); + if (xds_certificate_provider_ != nullptr) { + grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg(); + args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1); + } else { + args.args = grpc_channel_args_copy(args_); } + child_policy_->UpdateLocked(std::move(args)); } - // Update child policy. - UpdateArgs args; - args.config = std::move(config); - if (xds_certificate_provider_ != nullptr) { - grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg(); - args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1); - } else { - args.args = grpc_channel_args_copy(args_); + // Remove entries in watchers_ for any clusters not in clusters_needed + for (auto it = watchers_.begin(); it != watchers_.end();) { + const std::string& cluster_name = it->first; + if (clusters_needed.find(cluster_name) != clusters_needed.end()) { + ++it; + continue; + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, + cluster_name.c_str()); + } + CancelClusterDataWatch(cluster_name, it->second.watcher, + /*delay_unsubscription=*/false); + it = watchers_.erase(it); } - child_policy_->UpdateLocked(std::move(args)); } -void CdsLb::OnError(grpc_error* error) { +void CdsLb::OnError(const std::string& name, grpc_error* error) { gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s", - this, config_->cluster().c_str(), grpc_error_string(error)); + this, name.c_str(), grpc_error_string(error)); // Go into TRANSIENT_FAILURE if we have not yet created the child // policy (i.e., we have not yet received data from xds). Otherwise, // we keep running with the data we had previously. @@ -426,11 +540,11 @@ void CdsLb::OnError(grpc_error* error) { } } -void CdsLb::OnResourceDoesNotExist() { +void CdsLb::OnResourceDoesNotExist(const std::string& name) { gpr_log(GPR_ERROR, "[cdslb %p] CDS resource for %s does not exist -- reporting " "TRANSIENT_FAILURE", - this, config_->cluster().c_str()); + this, name.c_str()); grpc_error* error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("CDS resource \"", config_->cluster(), @@ -444,7 +558,7 @@ void CdsLb::OnResourceDoesNotExist() { } grpc_error* CdsLb::UpdateXdsCertificateProvider( - const XdsApi::CdsUpdate& cluster_data) { + const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data) { // Early out if channel is not configured to use xds security. grpc_channel_credentials* channel_credentials = grpc_channel_credentials_find_in_args(args_); @@ -490,7 +604,7 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider( root_certificate_provider_ = std::move(new_root_provider); } xds_certificate_provider_->UpdateRootCertNameAndDistributor( - config_->cluster(), root_provider_cert_name, + cluster_name, root_provider_cert_name, root_certificate_provider_ == nullptr ? nullptr : root_certificate_provider_->distributor()); @@ -528,7 +642,7 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider( identity_certificate_provider_ = std::move(new_identity_provider); } xds_certificate_provider_->UpdateIdentityCertNameAndDistributor( - config_->cluster(), identity_provider_cert_name, + cluster_name, identity_provider_cert_name, identity_certificate_provider_ == nullptr ? nullptr : identity_certificate_provider_->distributor()); @@ -537,10 +651,24 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider( cluster_data.common_tls_context.combined_validation_context .default_validation_context.match_subject_alt_names; xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers( - config_->cluster(), match_subject_alt_names); + cluster_name, match_subject_alt_names); return GRPC_ERROR_NONE; } +void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name, + XdsClient::ClusterWatcherInterface* watcher, + bool delay_unsubscription) { + if (xds_certificate_provider_ != nullptr) { + std::string name(cluster_name); + xds_certificate_provider_->UpdateRootCertNameAndDistributor(name, "", + nullptr); + xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(name, "", + nullptr); + xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {}); + } + xds_client_->CancelClusterDataWatch(cluster_name, watcher, + delay_unsubscription); +} // // factory // @@ -575,6 +703,7 @@ class CdsLbFactory : public LoadBalancingPolicyFactory { return nullptr; } std::vector error_list; + // cluster name. std::string cluster; auto it = json.object_value().find("cluster"); if (it == json.object_value().end()) { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h index dd64ea7f77c..2351502c8ce 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h @@ -21,4 +21,9 @@ // Set by xds_cluster_impl LB policy and used by GoogleDefaultCredentials. #define GRPC_ARG_XDS_CLUSTER_NAME "grpc.internal.xds_cluster_name" +// For testing purpose, this channel arg indicating xds_cluster_resolver LB +// policy should use the fake DNS resolver to resolve logical dns cluster. +#define GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR \ + "grpc.internal.xds_logical_dns_cluster_fake_resolver_response_generator" + #endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_ARGS_H diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index 7087ec2562e..ddb310d93e6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -29,8 +29,10 @@ #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/xds/xds_channel_args.h" @@ -469,11 +471,26 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: // void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() { + std::string target = parent()->server_name_; + grpc_channel_args* args = nullptr; + FakeResolverResponseGenerator* fake_resolver_response_generator = + grpc_channel_args_find_pointer( + parent()->args_, + GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR); + if (fake_resolver_response_generator != nullptr) { + target = absl::StrCat("fake:", target); + grpc_arg new_arg = FakeResolverResponseGenerator::MakeChannelArg( + fake_resolver_response_generator); + args = grpc_channel_args_copy_and_add(parent()->args_, &new_arg, 1); + } else { + args = grpc_channel_args_copy(parent()->args_); + } resolver_ = ResolverRegistry::CreateResolver( - parent()->server_name_.c_str(), parent()->args_, - grpc_pollset_set_create(), parent()->work_serializer(), + target.c_str(), args, parent()->interested_parties(), + parent()->work_serializer(), absl::make_unique( Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism"))); + grpc_channel_args_destroy(args); if (resolver_ == nullptr) { parent()->OnResourceDoesNotExist(index()); return; @@ -509,9 +526,11 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: XdsApi::EdsUpdate update; XdsApi::EdsUpdate::Priority::Locality locality; locality.name = MakeRefCounted("", "", ""); + locality.lb_weight = 1; locality.endpoints = std::move(result.addresses); - update.priorities[0].localities.emplace(locality.name.get(), - std::move(locality)); + XdsApi::EdsUpdate::Priority priority; + priority.localities.emplace(locality.name.get(), std::move(locality)); + update.priorities.emplace_back(std::move(priority)); discovery_mechanism_->parent()->OnEndpointChanged( discovery_mechanism_->index(), std::move(update)); } @@ -825,23 +844,13 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { Json::Object priority_children; Json::Array priority_priorities; // Setting up index to iterate through the discovery mechanisms and keeping - // track the discovery_mechanism each prioirty belongs to. + // track the discovery_mechanism each priority belongs to. size_t discovery_index = 0; // Setting up num_priorities_remaining to track the priorities in each // discovery_mechanism. size_t num_priorities_remaining_in_discovery = discovery_mechanisms_[discovery_index].num_priorities; for (size_t priority = 0; priority < priority_list_.size(); ++priority) { - // Each prioirty in the priority_list_ should correspond to a priority in a - // discovery mechanism in discovery_mechanisms_ (both in the same order). - // Keeping track of the discovery_mechanism each prioirty belongs to. - if (num_priorities_remaining_in_discovery == 0) { - ++discovery_index; - num_priorities_remaining_in_discovery = - discovery_mechanisms_[discovery_index].num_priorities; - } else { - --num_priorities_remaining_in_discovery; - } const auto& localities = priority_list_[priority].localities; Json::Object weighted_targets; for (const auto& p : localities) { @@ -913,6 +922,16 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { {"config", std::move(locality_picking_policy)}, {"ignore_reresolution_requests", true}, }; + // Each priority in the priority_list_ should correspond to a priority in a + // discovery mechanism in discovery_mechanisms_ (both in the same order). + // Keeping track of the discovery_mechanism each priority belongs to. + --num_priorities_remaining_in_discovery; + while (num_priorities_remaining_in_discovery == 0 && + discovery_index < discovery_mechanisms_.size() - 1) { + ++discovery_index; + num_priorities_remaining_in_discovery = + discovery_mechanisms_[discovery_index].num_priorities; + } } // There should be matching number of priorities in discovery_mechanisms_ and // in priority_list_; therefore at the end of looping through all the diff --git a/src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c b/src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c new file mode 100644 index 00000000000..308e8b53388 --- /dev/null +++ b/src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c @@ -0,0 +1,29 @@ +/* This file was generated by upbc (the upb compiler) from the input + * file: + * + * envoy/extensions/clusters/aggregate/v3/cluster.proto + * + * Do not edit -- your changes will be discarded when the file is + * regenerated. */ + +#include +#include "upb/msg.h" +#include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h" +#include "udpa/annotations/status.upb.h" +#include "udpa/annotations/versioning.upb.h" +#include "validate/validate.upb.h" + +#include "upb/port_def.inc" + +static const upb_msglayout_field envoy_extensions_clusters_aggregate_v3_ClusterConfig__fields[1] = { + {1, UPB_SIZE(0, 0), 0, 0, 9, 3}, +}; + +const upb_msglayout envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit = { + NULL, + &envoy_extensions_clusters_aggregate_v3_ClusterConfig__fields[0], + UPB_SIZE(8, 8), 1, false, 255, +}; + +#include "upb/port_undef.inc" + diff --git a/src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h b/src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h new file mode 100644 index 00000000000..ffa4ccbca29 --- /dev/null +++ b/src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h @@ -0,0 +1,67 @@ +/* This file was generated by upbc (the upb compiler) from the input + * file: + * + * envoy/extensions/clusters/aggregate/v3/cluster.proto + * + * Do not edit -- your changes will be discarded when the file is + * regenerated. */ + +#ifndef ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPB_H_ +#define ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPB_H_ + +#include "upb/msg.h" +#include "upb/decode.h" +#include "upb/decode_fast.h" +#include "upb/encode.h" + +#include "upb/port_def.inc" + +#ifdef __cplusplus +extern "C" { +#endif + +struct envoy_extensions_clusters_aggregate_v3_ClusterConfig; +typedef struct envoy_extensions_clusters_aggregate_v3_ClusterConfig envoy_extensions_clusters_aggregate_v3_ClusterConfig; +extern const upb_msglayout envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit; + + +/* envoy.extensions.clusters.aggregate.v3.ClusterConfig */ + +UPB_INLINE envoy_extensions_clusters_aggregate_v3_ClusterConfig *envoy_extensions_clusters_aggregate_v3_ClusterConfig_new(upb_arena *arena) { + return (envoy_extensions_clusters_aggregate_v3_ClusterConfig *)_upb_msg_new(&envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit, arena); +} +UPB_INLINE envoy_extensions_clusters_aggregate_v3_ClusterConfig *envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse(const char *buf, size_t size, + upb_arena *arena) { + envoy_extensions_clusters_aggregate_v3_ClusterConfig *ret = envoy_extensions_clusters_aggregate_v3_ClusterConfig_new(arena); + return (ret && upb_decode(buf, size, ret, &envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit, arena)) ? ret : NULL; +} +UPB_INLINE envoy_extensions_clusters_aggregate_v3_ClusterConfig *envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse_ex(const char *buf, size_t size, + upb_arena *arena, int options) { + envoy_extensions_clusters_aggregate_v3_ClusterConfig *ret = envoy_extensions_clusters_aggregate_v3_ClusterConfig_new(arena); + return (ret && _upb_decode(buf, size, ret, &envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit, arena, options)) + ? ret : NULL; +} +UPB_INLINE char *envoy_extensions_clusters_aggregate_v3_ClusterConfig_serialize(const envoy_extensions_clusters_aggregate_v3_ClusterConfig *msg, upb_arena *arena, size_t *len) { + return upb_encode(msg, &envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit, arena, len); +} + +UPB_INLINE upb_strview const* envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters(const envoy_extensions_clusters_aggregate_v3_ClusterConfig *msg, size_t *len) { return (upb_strview const*)_upb_array_accessor(msg, UPB_SIZE(0, 0), len); } + +UPB_INLINE upb_strview* envoy_extensions_clusters_aggregate_v3_ClusterConfig_mutable_clusters(envoy_extensions_clusters_aggregate_v3_ClusterConfig *msg, size_t *len) { + return (upb_strview*)_upb_array_mutable_accessor(msg, UPB_SIZE(0, 0), len); +} +UPB_INLINE upb_strview* envoy_extensions_clusters_aggregate_v3_ClusterConfig_resize_clusters(envoy_extensions_clusters_aggregate_v3_ClusterConfig *msg, size_t len, upb_arena *arena) { + return (upb_strview*)_upb_array_resize_accessor2(msg, UPB_SIZE(0, 0), len, UPB_SIZE(3, 4), arena); +} +UPB_INLINE bool envoy_extensions_clusters_aggregate_v3_ClusterConfig_add_clusters(envoy_extensions_clusters_aggregate_v3_ClusterConfig *msg, upb_strview val, upb_arena *arena) { + return _upb_array_append_accessor2(msg, UPB_SIZE(0, 0), UPB_SIZE(3, 4), &val, + arena); +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#include "upb/port_undef.inc" + +#endif /* ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPB_H_ */ diff --git a/src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c b/src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c new file mode 100644 index 00000000000..d159a9920d9 --- /dev/null +++ b/src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c @@ -0,0 +1,51 @@ +/* This file was generated by upbc (the upb compiler) from the input + * file: + * + * envoy/extensions/clusters/aggregate/v3/cluster.proto + * + * Do not edit -- your changes will be discarded when the file is + * regenerated. */ + +#include "upb/def.h" +#include "envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h" + +extern upb_def_init udpa_annotations_status_proto_upbdefinit; +extern upb_def_init udpa_annotations_versioning_proto_upbdefinit; +extern upb_def_init validate_validate_proto_upbdefinit; +extern const upb_msglayout envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit; + +static const upb_msglayout *layouts[1] = { + &envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit, +}; + +static const char descriptor[389] = {'\n', '4', 'e', 'n', 'v', 'o', 'y', '/', 'e', 'x', 't', 'e', 'n', 's', 'i', 'o', 'n', 's', '/', 'c', 'l', 'u', 's', 't', 'e', +'r', 's', '/', 'a', 'g', 'g', 'r', 'e', 'g', 'a', 't', 'e', '/', 'v', '3', '/', 'c', 'l', 'u', 's', 't', 'e', 'r', '.', 'p', +'r', 'o', 't', 'o', '\022', '&', 'e', 'n', 'v', 'o', 'y', '.', 'e', 'x', 't', 'e', 'n', 's', 'i', 'o', 'n', 's', '.', 'c', 'l', +'u', 's', 't', 'e', 'r', 's', '.', 'a', 'g', 'g', 'r', 'e', 'g', 'a', 't', 'e', '.', 'v', '3', '\032', '\035', 'u', 'd', 'p', 'a', +'/', 'a', 'n', 'n', 'o', 't', 'a', 't', 'i', 'o', 'n', 's', '/', 's', 't', 'a', 't', 'u', 's', '.', 'p', 'r', 'o', 't', 'o', +'\032', '!', 'u', 'd', 'p', 'a', '/', 'a', 'n', 'n', 'o', 't', 'a', 't', 'i', 'o', 'n', 's', '/', 'v', 'e', 'r', 's', 'i', 'o', +'n', 'i', 'n', 'g', '.', 'p', 'r', 'o', 't', 'o', '\032', '\027', 'v', 'a', 'l', 'i', 'd', 'a', 't', 'e', '/', 'v', 'a', 'l', 'i', +'d', 'a', 't', 'e', '.', 'p', 'r', 'o', 't', 'o', '\"', 'r', '\n', '\r', 'C', 'l', 'u', 's', 't', 'e', 'r', 'C', 'o', 'n', 'f', +'i', 'g', '\022', '$', '\n', '\010', 'c', 'l', 'u', 's', 't', 'e', 'r', 's', '\030', '\001', ' ', '\003', '(', '\t', 'B', '\010', '\372', 'B', '\005', +'\222', '\001', '\002', '\010', '\001', 'R', '\010', 'c', 'l', 'u', 's', 't', 'e', 'r', 's', ':', ';', '\232', '\305', '\210', '\036', '6', '\n', '4', 'e', +'n', 'v', 'o', 'y', '.', 'c', 'o', 'n', 'f', 'i', 'g', '.', 'c', 'l', 'u', 's', 't', 'e', 'r', '.', 'a', 'g', 'g', 'r', 'e', +'g', 'a', 't', 'e', '.', 'v', '2', 'a', 'l', 'p', 'h', 'a', '.', 'C', 'l', 'u', 's', 't', 'e', 'r', 'C', 'o', 'n', 'f', 'i', +'g', 'B', 'N', '\n', '4', 'i', 'o', '.', 'e', 'n', 'v', 'o', 'y', 'p', 'r', 'o', 'x', 'y', '.', 'e', 'n', 'v', 'o', 'y', '.', +'e', 'x', 't', 'e', 'n', 's', 'i', 'o', 'n', 's', '.', 'c', 'l', 'u', 's', 't', 'e', 'r', 's', '.', 'a', 'g', 'g', 'r', 'e', +'g', 'a', 't', 'e', '.', 'v', '3', 'B', '\014', 'C', 'l', 'u', 's', 't', 'e', 'r', 'P', 'r', 'o', 't', 'o', 'P', '\001', '\272', '\200', +'\310', '\321', '\006', '\002', '\020', '\002', 'b', '\006', 'p', 'r', 'o', 't', 'o', '3', +}; + +static upb_def_init *deps[4] = { + &udpa_annotations_status_proto_upbdefinit, + &udpa_annotations_versioning_proto_upbdefinit, + &validate_validate_proto_upbdefinit, + NULL +}; + +upb_def_init envoy_extensions_clusters_aggregate_v3_cluster_proto_upbdefinit = { + deps, + layouts, + "envoy/extensions/clusters/aggregate/v3/cluster.proto", + UPB_STRVIEW_INIT(descriptor, 389) +}; diff --git a/src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h b/src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h new file mode 100644 index 00000000000..fd6d1b87c83 --- /dev/null +++ b/src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h @@ -0,0 +1,35 @@ +/* This file was generated by upbc (the upb compiler) from the input + * file: + * + * envoy/extensions/clusters/aggregate/v3/cluster.proto + * + * Do not edit -- your changes will be discarded when the file is + * regenerated. */ + +#ifndef ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPBDEFS_H_ +#define ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPBDEFS_H_ + +#include "upb/def.h" +#include "upb/port_def.inc" +#ifdef __cplusplus +extern "C" { +#endif + +#include "upb/def.h" + +#include "upb/port_def.inc" + +extern upb_def_init envoy_extensions_clusters_aggregate_v3_cluster_proto_upbdefinit; + +UPB_INLINE const upb_msgdef *envoy_extensions_clusters_aggregate_v3_ClusterConfig_getmsgdef(upb_symtab *s) { + _upb_symtab_loaddefinit(s, &envoy_extensions_clusters_aggregate_v3_cluster_proto_upbdefinit); + return upb_symtab_lookupmsg(s, "envoy.extensions.clusters.aggregate.v3.ClusterConfig"); +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#include "upb/port_undef.inc" + +#endif /* ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPBDEFS_H_ */ diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index 816b34e96e3..2a26d3692ea 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -62,6 +62,7 @@ #include "envoy/config/route/v3/route.upb.h" #include "envoy/config/route/v3/route.upbdefs.h" #include "envoy/config/route/v3/route_components.upb.h" +#include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h" #include "envoy/extensions/transport_sockets/tls/v3/common.upb.h" #include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h" @@ -101,6 +102,19 @@ bool XdsTimeoutEnabled() { return parse_succeeded && parsed_value; } +// TODO (donnadionne): Check to see if cluster types aggregate_cluster and +// logical_dns are enabled, this will be +// removed once the cluster types are fully integration-tested and enabled by +// default. +bool XdsAggregateAndLogicalDnsClusterEnabled() { + char* value = gpr_getenv( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); + bool parsed_value; + bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value); + gpr_free(value); + return parse_succeeded && parsed_value; +} + // TODO(yashykt): Check to see if xDS security is enabled. This will be // removed once this feature is fully integration-tested and enabled by // default. @@ -1608,29 +1622,79 @@ grpc_error* CdsResponseParse( } XdsApi::CdsUpdate& cds_update = (*cds_update_map)[std::move(cluster_name)]; // Check the cluster_discovery_type. - if (!envoy_config_cluster_v3_Cluster_has_type(cluster)) { + if (!envoy_config_cluster_v3_Cluster_has_type(cluster) && + !envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found."); } - if (envoy_config_cluster_v3_Cluster_type(cluster) != + if (envoy_config_cluster_v3_Cluster_type(cluster) == envoy_config_cluster_v3_Cluster_EDS) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType is not EDS."); - } - // Check the EDS config source. - const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config = - envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster); - const envoy_config_core_v3_ConfigSource* eds_config = - envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config( - eds_cluster_config); - if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config)) { + cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::EDS; + // Check the EDS config source. + const envoy_config_cluster_v3_Cluster_EdsClusterConfig* + eds_cluster_config = + envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster); + const envoy_config_core_v3_ConfigSource* eds_config = + envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config( + eds_cluster_config); + if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config)) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "EDS ConfigSource is not ADS."); + } + // Record EDS service_name (if any). + upb_strview service_name = + envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name( + eds_cluster_config); + if (service_name.size != 0) { + cds_update.eds_service_name = UpbStringToStdString(service_name); + } + } else if (!XdsAggregateAndLogicalDnsClusterEnabled()) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "EDS ConfigSource is not ADS."); - } - // Record EDS service_name (if any). - upb_strview service_name = - envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name( - eds_cluster_config); - if (service_name.size != 0) { - cds_update.eds_service_name = UpbStringToStdString(service_name); + "DiscoveryType is not valid."); + } else if (envoy_config_cluster_v3_Cluster_type(cluster) == + envoy_config_cluster_v3_Cluster_LOGICAL_DNS) { + cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS; + } else { + if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { + const envoy_config_cluster_v3_Cluster_CustomClusterType* + custom_cluster_type = + envoy_config_cluster_v3_Cluster_cluster_type(cluster); + upb_strview type_name = + envoy_config_cluster_v3_Cluster_CustomClusterType_name( + custom_cluster_type); + if (UpbStringToAbsl(type_name) == "envoy.clusters.aggregate") { + cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::AGGREGATE; + // Retrieve aggregate clusters. + const google_protobuf_Any* typed_config = + envoy_config_cluster_v3_Cluster_CustomClusterType_typed_config( + custom_cluster_type); + const upb_strview aggregate_cluster_config_upb_strview = + google_protobuf_Any_value(typed_config); + const envoy_extensions_clusters_aggregate_v3_ClusterConfig* + aggregate_cluster_config = + envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse( + aggregate_cluster_config_upb_strview.data, + aggregate_cluster_config_upb_strview.size, arena); + if (aggregate_cluster_config == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Can't parse aggregate cluster."); + } + size_t size; + const upb_strview* clusters = + envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters( + aggregate_cluster_config, &size); + for (size_t i = 0; i < size; ++i) { + const upb_strview cluster = clusters[i]; + cds_update.prioritized_cluster_names.emplace_back( + UpbStringToStdString(cluster)); + } + } else { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "DiscoveryType is not valid."); + } + } else { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "DiscoveryType is not valid."); + } } // Check the LB policy. if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) != diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h index f1718482f15..b338ac52394 100644 --- a/src/core/ext/xds/xds_api.h +++ b/src/core/ext/xds/xds_api.h @@ -229,6 +229,9 @@ class XdsApi { using RdsUpdateMap = std::map; struct CdsUpdate { + enum ClusterType { EDS, LOGICAL_DNS, AGGREGATE }; + ClusterType cluster_type; + // For cluster type EDS. // The name to use in the EDS request. // If empty, the cluster name will be used. std::string eds_service_name; @@ -242,12 +245,17 @@ class XdsApi { // Maximum number of outstanding requests can be made to the upstream // cluster. uint32_t max_concurrent_requests = 1024; + // For cluster type AGGREGATE. + // The prioritized list of cluster names. + std::vector prioritized_cluster_names; bool operator==(const CdsUpdate& other) const { - return eds_service_name == other.eds_service_name && + return cluster_type == other.cluster_type && + eds_service_name == other.eds_service_name && common_tls_context == other.common_tls_context && lrs_load_reporting_server_name == other.lrs_load_reporting_server_name && + prioritized_cluster_names == other.prioritized_cluster_names && max_concurrent_requests == other.max_concurrent_requests; } diff --git a/src/proto/grpc/testing/xds/cds_for_test.proto b/src/proto/grpc/testing/xds/cds_for_test.proto index 8ea198d0ee7..3d3c1c164fb 100644 --- a/src/proto/grpc/testing/xds/cds_for_test.proto +++ b/src/proto/grpc/testing/xds/cds_for_test.proto @@ -72,6 +72,15 @@ message CircuitBreakers { repeated Thresholds thresholds = 1; } +message ClusterConfig { + repeated string clusters = 1; +} + +message CustomClusterType { + string name = 1; + ClusterConfig typed_config = 2; +} + message Cluster { // Refer to :ref:`service discovery type ` // for an explanation on each type. @@ -106,6 +115,9 @@ message Cluster { // The :ref:`service discovery type ` // to use for resolving the cluster. DiscoveryType type = 2; + + // The custom cluster type: aggregate cluster in this case. + CustomClusterType cluster_type = 38; } // Only valid when discovery type is EDS. diff --git a/src/proto/grpc/testing/xds/v3/BUILD b/src/proto/grpc/testing/xds/v3/BUILD index ce99015c70c..6d212fa72c1 100644 --- a/src/proto/grpc/testing/xds/v3/BUILD +++ b/src/proto/grpc/testing/xds/v3/BUILD @@ -204,6 +204,17 @@ grpc_proto_library( ], ) +grpc_proto_library( + name = "aggregate_cluster_proto", + srcs = [ + "aggregate_cluster.proto", + ], + well_known_protos = True, + deps = [ + "string_proto", + ], +) + grpc_proto_library( name = "tls_proto", srcs = [ diff --git a/src/proto/grpc/testing/xds/v3/aggregate_cluster.proto b/src/proto/grpc/testing/xds/v3/aggregate_cluster.proto new file mode 100644 index 00000000000..134e369c10c --- /dev/null +++ b/src/proto/grpc/testing/xds/v3/aggregate_cluster.proto @@ -0,0 +1,30 @@ +// Copyright 2020 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Local copy of Envoy xDS proto file, used for testing only. + +syntax = "proto3"; + +package envoy.extensions.clusters.aggregate.v3; + +import "src/proto/grpc/testing/xds/v3/string.proto"; + +// Configuration for the aggregate cluster. See the :ref:`architecture overview +// ` for more information. +// [#extension: envoy.clusters.aggregate] +message ClusterConfig { + // Load balancing clusters in aggregate cluster. Clusters are prioritized based on the order they + // appear in this list. + repeated string clusters = 1; +} diff --git a/src/proto/grpc/testing/xds/v3/cluster.proto b/src/proto/grpc/testing/xds/v3/cluster.proto index c493e865973..c04fe20a919 100644 --- a/src/proto/grpc/testing/xds/v3/cluster.proto +++ b/src/proto/grpc/testing/xds/v3/cluster.proto @@ -21,6 +21,7 @@ package envoy.config.cluster.v3; import "src/proto/grpc/testing/xds/v3/base.proto"; import "src/proto/grpc/testing/xds/v3/config_source.proto"; +import "google/protobuf/any.proto"; import "google/protobuf/wrappers.proto"; enum RoutingPriority { @@ -36,6 +37,16 @@ message CircuitBreakers { repeated Thresholds thresholds = 1; } +// Extended cluster type. +message CustomClusterType { + // The type of the cluster to instantiate. The name must match a supported cluster type. + string name = 1; + + // Cluster specific configuration which depends on the cluster being instantiated. + // See the supported cluster for further documentation. + google.protobuf.Any typed_config = 2; +} + // [#protodoc-title: Cluster configuration] // Configuration for a single upstream cluster. @@ -134,6 +145,9 @@ message Cluster { // The :ref:`service discovery type ` // to use for resolving the cluster. DiscoveryType type = 2; + + // The custom cluster type. + CustomClusterType cluster_type = 38; } // Configuration to use for EDS updates for the Cluster. diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 008482e043b..6ae8ac339ec 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -151,6 +151,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c', 'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c', 'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c', + 'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c', 'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c', 'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c', @@ -237,6 +238,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c', + 'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c', 'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c', diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 1e7a07806a5..d0bee59ff8b 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -536,6 +536,7 @@ grpc_cc_test( "//src/proto/grpc/testing/xds:lds_rds_for_test_proto", "//src/proto/grpc/testing/xds:lrs_for_test_proto", "//src/proto/grpc/testing/xds/v3:ads_proto", + "//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto", "//src/proto/grpc/testing/xds/v3:cluster_proto", "//src/proto/grpc/testing/xds/v3:discovery_proto", "//src/proto/grpc/testing/xds/v3:endpoint_proto", diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index c0594f904de..4c6e38341d2 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -49,6 +49,7 @@ #include #include "src/core/ext/filters/client_channel/backup_poller.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/xds/certificate_provider_registry.h" @@ -81,6 +82,7 @@ #include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h" +#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h" @@ -98,11 +100,13 @@ using std::chrono::system_clock; using ::envoy::config::cluster::v3::CircuitBreakers; using ::envoy::config::cluster::v3::Cluster; +using ::envoy::config::cluster::v3::CustomClusterType; using ::envoy::config::cluster::v3::RoutingPriority; using ::envoy::config::endpoint::v3::ClusterLoadAssignment; using ::envoy::config::endpoint::v3::HealthStatus; using ::envoy::config::listener::v3::Listener; using ::envoy::config::route::v3::RouteConfiguration; +using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig; using ::envoy::extensions::filters::network::http_connection_manager::v3:: HttpConnectionManager; using ::envoy::extensions::transport_sockets::tls::v3::DownstreamTlsContext; @@ -1498,6 +1502,28 @@ std::shared_ptr CreateTlsFallbackCredentials() { return channel_creds; } +namespace { + +void* response_generator_arg_copy(void* p) { + auto* generator = static_cast(p); + generator->Ref().release(); + return p; +} + +void response_generator_arg_destroy(void* p) { + auto* generator = static_cast(p); + generator->Unref(); +} + +int response_generator_cmp(void* a, void* b) { return GPR_ICMP(a, b); } + +const grpc_arg_pointer_vtable + kLogicalDnsClusterResolverResponseGeneratorVtable = { + response_generator_arg_copy, response_generator_arg_destroy, + response_generator_cmp}; + +} // namespace + class XdsEnd2endTest : public ::testing::TestWithParam { protected: XdsEnd2endTest(size_t num_backends, size_t num_balancers, @@ -1535,6 +1561,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam { xds_channel_args_to_add_.emplace_back( grpc_core::FakeResolverResponseGenerator::MakeChannelArg( lb_channel_response_generator_.get())); + // Inject xDS logical cluster resolver response generator. + logical_dns_cluster_resolver_response_generator_ = + grpc_core::MakeRefCounted(); if (xds_resource_does_not_exist_timeout_ms_ > 0) { xds_channel_args_to_add_.emplace_back(grpc_channel_arg_integer_create( const_cast(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS), @@ -1640,6 +1669,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam { args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator); } + args.SetPointerWithVtable( + GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR, + logical_dns_cluster_resolver_response_generator_.get(), + &kLogicalDnsClusterResolverResponseGeneratorVtable); std::string uri = absl::StrCat( GetParam().use_xds_resolver() ? "xds" : "fake", ":///", server_name); std::shared_ptr channel_creds = @@ -2270,6 +2303,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam { response_generator_; grpc_core::RefCountedPtr lb_channel_response_generator_; + grpc_core::RefCountedPtr + logical_dns_cluster_resolver_response_generator_; int xds_resource_does_not_exist_timeout_ms_ = 0; absl::InlinedVector xds_channel_args_to_add_; grpc_channel_args xds_channel_args_; @@ -5300,9 +5335,172 @@ TEST_P(CdsTest, Vanilla) { AdsServiceImpl::ResponseState::ACKED); } +TEST_P(CdsTest, LogicalDNSClusterType) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", + "true"); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + // Create Logical DNS Cluster + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + balancers_[0]->ads_service()->SetCdsResource(cluster); + // Set Logical DNS result + { + grpc_core::ExecCtx exec_ctx; + grpc_core::Resolver::Result result; + result.addresses = CreateAddressListFromPortList(GetBackendPorts(1, 2)); + logical_dns_cluster_resolver_response_generator_->SetResponse( + std::move(result)); + } + // Wait for traffic to go to backend 1. + WaitForBackend(1); + gpr_unsetenv( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); +} + +TEST_P(CdsTest, AggregateClusterType) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", + "true"); + const char* kNewCluster1Name = "new_cluster_1"; + const char* kNewEdsService1Name = "new_eds_service_name_1"; + const char* kNewCluster2Name = "new_cluster_2"; + const char* kNewEdsService2Name = "new_eds_service_name_2"; + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + // Populate new EDS resources. + AdsServiceImpl::EdsResourceArgs args1({ + {"locality0", GetBackendPorts(1, 2)}, + }); + AdsServiceImpl::EdsResourceArgs args2({ + {"locality0", GetBackendPorts(2, 3)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args1, kNewEdsService1Name)); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args2, kNewEdsService2Name)); + // Populate new CDS resources. + Cluster new_cluster1 = default_cluster_; + new_cluster1.set_name(kNewCluster1Name); + new_cluster1.mutable_eds_cluster_config()->set_service_name( + kNewEdsService1Name); + balancers_[0]->ads_service()->SetCdsResource(new_cluster1); + Cluster new_cluster2 = default_cluster_; + new_cluster2.set_name(kNewCluster2Name); + new_cluster2.mutable_eds_cluster_config()->set_service_name( + kNewEdsService2Name); + balancers_[0]->ads_service()->SetCdsResource(new_cluster2); + // Create Aggregate Cluster + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kNewCluster1Name); + cluster_config.add_clusters(kNewCluster2Name); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancers_[0]->ads_service()->SetCdsResource(cluster); + // Wait for traffic to go to backend 1. + WaitForBackend(1); + // Shutdown backend 1 and wait for all traffic to go to backend 2. + ShutdownBackend(1); + WaitForBackend(2); + EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state().state, + AdsServiceImpl::ResponseState::ACKED); + gpr_unsetenv( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); +} + +TEST_P(CdsTest, AggregateClusterMixedType) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", + "true"); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + const char* kNewCluster2Name = "new_cluster_2"; + const char* kNewEdsService2Name = "new_eds_service_name_2"; + const char* kLogicalDNSClusterName = "logical_dns_cluster"; + // Populate new EDS resources. + AdsServiceImpl::EdsResourceArgs args2({ + {"locality0", GetBackendPorts(2, 3)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args2, kNewEdsService2Name)); + // Populate new CDS resources. + Cluster new_cluster2 = default_cluster_; + new_cluster2.set_name(kNewCluster2Name); + new_cluster2.mutable_eds_cluster_config()->set_service_name( + kNewEdsService2Name); + balancers_[0]->ads_service()->SetCdsResource(new_cluster2); + // Create Logical DNS Cluster + auto logical_dns_cluster = default_cluster_; + logical_dns_cluster.set_name(kLogicalDNSClusterName); + logical_dns_cluster.set_type(Cluster::LOGICAL_DNS); + balancers_[0]->ads_service()->SetCdsResource(logical_dns_cluster); + // Create Aggregate Cluster + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters(kLogicalDNSClusterName); + cluster_config.add_clusters(kNewCluster2Name); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + balancers_[0]->ads_service()->SetCdsResource(cluster); + // Set Logical DNS result + { + grpc_core::ExecCtx exec_ctx; + grpc_core::Resolver::Result result; + result.addresses = CreateAddressListFromPortList(GetBackendPorts(1, 2)); + logical_dns_cluster_resolver_response_generator_->SetResponse( + std::move(result)); + } + // Wait for traffic to go to backend 1. + WaitForBackend(1); + // Shutdown backend 1 and wait for all traffic to go to backend 2. + ShutdownBackend(1); + WaitForBackend(2); + EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state().state, + AdsServiceImpl::ResponseState::ACKED); + gpr_unsetenv( + "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"); +} + +// Test that CDS client should send a NACK if cluster type is Logical DNS but +// the feature is not yet supported. +TEST_P(CdsTest, LogicalDNSClusterTypeDisabled) { + auto cluster = default_cluster_; + cluster.set_type(Cluster::LOGICAL_DNS); + balancers_[0]->ads_service()->SetCdsResource(cluster); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); + const auto& response_state = + balancers_[0]->ads_service()->cds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + EXPECT_EQ(response_state.error_message, "DiscoveryType is not valid."); +} + +// Test that CDS client should send a NACK if cluster type is AGGREGATE but +// the feature is not yet supported. +TEST_P(CdsTest, AggregateClusterTypeDisabled) { + auto cluster = default_cluster_; + CustomClusterType* custom_cluster = cluster.mutable_cluster_type(); + custom_cluster->set_name("envoy.clusters.aggregate"); + ClusterConfig cluster_config; + cluster_config.add_clusters("cluster1"); + cluster_config.add_clusters("cluster2"); + custom_cluster->mutable_typed_config()->PackFrom(cluster_config); + cluster.set_type(Cluster::LOGICAL_DNS); + balancers_[0]->ads_service()->SetCdsResource(cluster); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); + const auto& response_state = + balancers_[0]->ads_service()->cds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + EXPECT_EQ(response_state.error_message, "DiscoveryType is not valid."); +} + // Tests that CDS client should send a NACK if the cluster type in CDS response -// is other than EDS. -TEST_P(CdsTest, WrongClusterType) { +// is unsupported. +TEST_P(CdsTest, UnsupportedClusterType) { auto cluster = default_cluster_; cluster.set_type(Cluster::STATIC); balancers_[0]->ads_service()->SetCdsResource(cluster); @@ -5312,7 +5510,7 @@ TEST_P(CdsTest, WrongClusterType) { const auto& response_state = balancers_[0]->ads_service()->cds_response_state(); EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); - EXPECT_EQ(response_state.error_message, "DiscoveryType is not EDS."); + EXPECT_EQ(response_state.error_message, "DiscoveryType is not valid."); } // Tests that CDS client should send a NACK if the eds_config in CDS response is diff --git a/tools/codegen/core/gen_upb_api.sh b/tools/codegen/core/gen_upb_api.sh index 6196a2da827..230c8b3ab82 100755 --- a/tools/codegen/core/gen_upb_api.sh +++ b/tools/codegen/core/gen_upb_api.sh @@ -75,6 +75,7 @@ proto_files=( \ "envoy/config/route/v3/route_components.proto" \ "envoy/config/route/v3/scoped_route.proto" \ "envoy/config/trace/v3/http_tracer.proto" \ + "envoy/extensions/clusters/aggregate/v3/cluster.proto" \ "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto" \ "envoy/extensions/transport_sockets/tls/v3/cert.proto" \ "envoy/extensions/transport_sockets/tls/v3/common.proto" \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 8f5132b30ce..d1b3909537f 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1289,6 +1289,8 @@ src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c \ src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h \ src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c \ src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h \ +src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c \ +src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h \ src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c \ src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h \ src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c \ @@ -1459,6 +1461,8 @@ src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h \ src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h \ +src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c \ +src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h \ src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h \ src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index c6b920dfc3c..c5c5663b6b9 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1125,6 +1125,8 @@ src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c \ src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h \ src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c \ src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h \ +src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c \ +src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h \ src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c \ src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h \ src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c \ @@ -1295,6 +1297,8 @@ src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h \ src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h \ +src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c \ +src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h \ src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c \ src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h \ src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c \