Merge pull request #25078 from donnadionne/watchers

Cds Watchers changes to support aggregate Cds.
pull/25245/head
donnadionne 4 years ago committed by GitHub
commit 7338639ccc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      BUILD
  2. 4
      BUILD.gn
  3. 9
      CMakeLists.txt
  4. 4
      Makefile
  5. 5
      build_autogenerated.yaml
  6. 4
      config.m4
  7. 8
      config.w32
  8. 4
      gRPC-C++.podspec
  9. 6
      gRPC-Core.podspec
  10. 4
      grpc.gemspec
  11. 2
      grpc.gyp
  12. 4
      package.xml
  13. 373
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  14. 5
      src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h
  15. 49
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  16. 29
      src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c
  17. 67
      src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h
  18. 51
      src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c
  19. 35
      src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h
  20. 102
      src/core/ext/xds/xds_api.cc
  21. 10
      src/core/ext/xds/xds_api.h
  22. 12
      src/proto/grpc/testing/xds/cds_for_test.proto
  23. 11
      src/proto/grpc/testing/xds/v3/BUILD
  24. 30
      src/proto/grpc/testing/xds/v3/aggregate_cluster.proto
  25. 14
      src/proto/grpc/testing/xds/v3/cluster.proto
  26. 2
      src/python/grpcio/grpc_core_dependencies.py
  27. 1
      test/cpp/end2end/BUILD
  28. 204
      test/cpp/end2end/xds_end2end_test.cc
  29. 1
      tools/codegen/core/gen_upb_api.sh
  30. 4
      tools/doxygen/Doxyfile.c++.internal
  31. 4
      tools/doxygen/Doxyfile.core.internal

@ -1479,7 +1479,9 @@ grpc_cc_library(
"grpc_base",
"grpc_client_channel",
"grpc_lb_address_filtering",
"grpc_lb_xds_channel_args",
"grpc_lb_xds_common",
"grpc_resolver_fake",
"grpc_xds_client",
],
)
@ -2628,6 +2630,7 @@ grpc_cc_library(
"src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c",
"src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c",
"src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c",
"src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c",
"src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c",
"src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c",
"src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c",
@ -2660,6 +2663,7 @@ grpc_cc_library(
"src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.h",
"src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h",
"src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h",
"src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h",
"src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h",
"src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.h",
"src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.h",
@ -2709,6 +2713,7 @@ grpc_cc_library(
"src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c",
"src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c",
"src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c",
"src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c",
"src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c",
"src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c",
"src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c",
@ -2740,6 +2745,7 @@ grpc_cc_library(
"src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.h",
"src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h",
"src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h",
"src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h",
"src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h",
"src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.h",
"src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.h",

@ -457,6 +457,8 @@ config("grpc_config") {
"src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h",
"src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c",
"src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h",
"src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c",
"src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h",
"src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c",
"src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h",
"src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c",
@ -627,6 +629,8 @@ config("grpc_config") {
"src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h",
"src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c",
"src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h",
"src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c",
"src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h",
"src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c",
"src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h",
"src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c",

@ -457,6 +457,9 @@ protobuf_generate_grpc_cpp(
protobuf_generate_grpc_cpp(
src/proto/grpc/testing/xds/v3/ads.proto
)
protobuf_generate_grpc_cpp(
src/proto/grpc/testing/xds/v3/aggregate_cluster.proto
)
protobuf_generate_grpc_cpp(
src/proto/grpc/testing/xds/v3/base.proto
)
@ -1582,6 +1585,7 @@ add_library(grpc
src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c
src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c
src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c
src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c
src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c
src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c
src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c
@ -1667,6 +1671,7 @@ add_library(grpc
src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c
src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c
src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c
src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c
src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c
src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c
src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c
@ -15616,6 +15621,10 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h

@ -1169,6 +1169,7 @@ LIBGRPC_SRC = \
src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c \
src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c \
src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c \
src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c \
src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c \
src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c \
src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c \
@ -1254,6 +1255,7 @@ LIBGRPC_SRC = \
src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c \
@ -2703,6 +2705,7 @@ src/core/ext/upb-generated/envoy/config/route/v3/route.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c: $(OPENSSL_DEP)
src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c: $(OPENSSL_DEP)
@ -2773,6 +2776,7 @@ src/core/ext/upbdefs-generated/envoy/config/route/v3/route.upbdefs.c: $(OPENSSL_
src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c: $(OPENSSL_DEP)
src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c: $(OPENSSL_DEP)
src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c: $(OPENSSL_DEP)
src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c: $(OPENSSL_DEP)
src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c: $(OPENSSL_DEP)
src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c: $(OPENSSL_DEP)
src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c: $(OPENSSL_DEP)

@ -497,6 +497,7 @@ libs:
- src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.h
- src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h
- src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h
- src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h
- src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h
- src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.h
- src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.h
@ -582,6 +583,7 @@ libs:
- src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.h
- src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h
- src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h
- src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h
- src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h
- src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.h
- src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.h
@ -1000,6 +1002,7 @@ libs:
- src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c
- src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c
- src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c
- src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c
- src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c
- src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c
- src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c
@ -1085,6 +1088,7 @@ libs:
- src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c
- src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c
- src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c
- src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c
- src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c
- src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c
- src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c
@ -7984,6 +7988,7 @@ targets:
- src/proto/grpc/testing/xds/lrs_for_test.proto
- src/proto/grpc/testing/xds/v3/address.proto
- src/proto/grpc/testing/xds/v3/ads.proto
- src/proto/grpc/testing/xds/v3/aggregate_cluster.proto
- src/proto/grpc/testing/xds/v3/base.proto
- src/proto/grpc/testing/xds/v3/cluster.proto
- src/proto/grpc/testing/xds/v3/config_source.proto

@ -175,6 +175,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c \
src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c \
src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c \
src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c \
src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c \
src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c \
src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c \
@ -261,6 +262,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c \
@ -1046,6 +1048,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/config/rbac/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/config/route/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/config/trace/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upb-generated/envoy/service/cluster/v3)
@ -1077,6 +1080,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/config/listener/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/config/route/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/config/trace/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-generated/envoy/service/cluster/v3)

@ -142,6 +142,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\upb-generated\\envoy\\config\\route\\v3\\route_components.upb.c " +
"src\\core\\ext\\upb-generated\\envoy\\config\\route\\v3\\scoped_route.upb.c " +
"src\\core\\ext\\upb-generated\\envoy\\config\\trace\\v3\\http_tracer.upb.c " +
"src\\core\\ext\\upb-generated\\envoy\\extensions\\clusters\\aggregate\\v3\\cluster.upb.c " +
"src\\core\\ext\\upb-generated\\envoy\\extensions\\filters\\network\\http_connection_manager\\v3\\http_connection_manager.upb.c " +
"src\\core\\ext\\upb-generated\\envoy\\extensions\\transport_sockets\\tls\\v3\\cert.upb.c " +
"src\\core\\ext\\upb-generated\\envoy\\extensions\\transport_sockets\\tls\\v3\\common.upb.c " +
@ -228,6 +229,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\upbdefs-generated\\envoy\\config\\route\\v3\\route_components.upbdefs.c " +
"src\\core\\ext\\upbdefs-generated\\envoy\\config\\route\\v3\\scoped_route.upbdefs.c " +
"src\\core\\ext\\upbdefs-generated\\envoy\\config\\trace\\v3\\http_tracer.upbdefs.c " +
"src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\clusters\\aggregate\\v3\\cluster.upbdefs.c " +
"src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\filters\\network\\http_connection_manager\\v3\\http_connection_manager.upbdefs.c " +
"src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\transport_sockets\\tls\\v3\\cert.upbdefs.c " +
"src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\transport_sockets\\tls\\v3\\common.upbdefs.c " +
@ -1061,6 +1063,9 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\config\\trace");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\config\\trace\\v3");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\clusters");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\clusters\\aggregate");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\clusters\\aggregate\\v3");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\filters");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\filters\\network");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upb-generated\\envoy\\extensions\\filters\\network\\http_connection_manager");
@ -1130,6 +1135,9 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\config\\trace");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\config\\trace\\v3");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\clusters");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\clusters\\aggregate");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\clusters\\aggregate\\v3");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\filters");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\filters\\network");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-generated\\envoy\\extensions\\filters\\network\\http_connection_manager");

@ -314,6 +314,7 @@ Pod::Spec.new do |s|
'src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.h',
'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h',
'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h',
'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h',
'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.h',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.h',
@ -400,6 +401,7 @@ Pod::Spec.new do |s|
'src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.h',
@ -932,6 +934,7 @@ Pod::Spec.new do |s|
'src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.h',
'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h',
'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h',
'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h',
'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.h',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.h',
@ -1018,6 +1021,7 @@ Pod::Spec.new do |s|
'src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.h',

@ -439,6 +439,8 @@ Pod::Spec.new do |s|
'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h',
'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c',
'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h',
'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c',
'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h',
'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c',
'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c',
@ -611,6 +613,8 @@ Pod::Spec.new do |s|
'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c',
@ -1464,6 +1468,7 @@ Pod::Spec.new do |s|
'src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.h',
'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h',
'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h',
'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h',
'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.h',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.h',
@ -1550,6 +1555,7 @@ Pod::Spec.new do |s|
'src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.h',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.h',

@ -354,6 +354,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h )
s.files += %w( src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c )
s.files += %w( src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h )
s.files += %w( src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c )
s.files += %w( src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h )
s.files += %w( src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c )
s.files += %w( src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h )
s.files += %w( src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c )
@ -526,6 +528,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h )
s.files += %w( src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c )
s.files += %w( src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h )
s.files += %w( src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c )
s.files += %w( src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h )
s.files += %w( src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c )
s.files += %w( src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h )
s.files += %w( src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c )

@ -588,6 +588,7 @@
'src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c',
'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c',
'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c',
'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c',
'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c',
@ -673,6 +674,7 @@
'src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c',

@ -334,6 +334,8 @@
<file baseinstalldir="/" name="src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c" role="src" />
@ -506,6 +508,8 @@
<file baseinstalldir="/" name="src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c" role="src" />

@ -68,21 +68,25 @@ class CdsLb : public LoadBalancingPolicy {
// Watcher for getting cluster data from XdsClient.
class ClusterWatcher : public XdsClient::ClusterWatcherInterface {
public:
explicit ClusterWatcher(RefCountedPtr<CdsLb> parent)
: parent_(std::move(parent)) {}
ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name)
: parent_(std::move(parent)), name_(std::move(name)) {}
void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override {
new Notifier(parent_, std::move(cluster_data));
new Notifier(parent_, name_, std::move(cluster_data));
}
void OnError(grpc_error* error) override { new Notifier(parent_, error); }
void OnResourceDoesNotExist() override { new Notifier(parent_); }
void OnError(grpc_error* error) override {
new Notifier(parent_, name_, error);
}
void OnResourceDoesNotExist() override { new Notifier(parent_, name_); }
private:
class Notifier {
public:
Notifier(RefCountedPtr<CdsLb> parent, XdsApi::CdsUpdate update);
Notifier(RefCountedPtr<CdsLb> parent, grpc_error* error);
explicit Notifier(RefCountedPtr<CdsLb> parent);
Notifier(RefCountedPtr<CdsLb> parent, std::string name,
XdsApi::CdsUpdate update);
Notifier(RefCountedPtr<CdsLb> parent, std::string name,
grpc_error* error);
explicit Notifier(RefCountedPtr<CdsLb> parent, std::string name);
private:
enum Type { kUpdate, kError, kDoesNotExist };
@ -91,12 +95,22 @@ class CdsLb : public LoadBalancingPolicy {
void RunInWorkSerializer(grpc_error* error);
RefCountedPtr<CdsLb> parent_;
std::string name_;
grpc_closure closure_;
XdsApi::CdsUpdate update_;
Type type_;
};
RefCountedPtr<CdsLb> parent_;
std::string name_;
};
struct WatcherState {
// Pointer to watcher, to be used when cancelling.
// Not owned, so do not dereference.
ClusterWatcher* watcher = nullptr;
// Most recent update obtained from this watcher.
absl::optional<XdsApi::CdsUpdate> update;
};
// Delegating helper to be passed to child policy.
@ -119,12 +133,20 @@ class CdsLb : public LoadBalancingPolicy {
void ShutdownLocked() override;
void OnClusterChanged(XdsApi::CdsUpdate cluster_data);
void OnError(grpc_error* error);
void OnResourceDoesNotExist();
bool GenerateDiscoveryMechanismForCluster(
const std::string& name, Json::Array* discovery_mechanisms,
std::set<std::string>* clusters_needed);
void OnClusterChanged(const std::string& name,
XdsApi::CdsUpdate cluster_data);
void OnError(const std::string& name, grpc_error* error);
void OnResourceDoesNotExist(const std::string& name);
grpc_error* UpdateXdsCertificateProvider(
const XdsApi::CdsUpdate& cluster_data);
const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data);
void CancelClusterDataWatch(absl::string_view cluster_name,
XdsClient::ClusterWatcherInterface* watcher,
bool delay_unsubscription = false);
void MaybeDestroyChildPolicyLocked();
@ -135,9 +157,10 @@ class CdsLb : public LoadBalancingPolicy {
// The xds client.
RefCountedPtr<XdsClient> xds_client_;
// A pointer to the cluster watcher, to be used when cancelling the watch.
// Note that this is not owned, so this pointer must never be derefernced.
ClusterWatcher* cluster_watcher_ = nullptr;
// Maps from cluster name to the state for that cluster.
// The root of the tree is config_->cluster().
std::map<std::string, WatcherState> watchers_;
RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
@ -155,21 +178,26 @@ class CdsLb : public LoadBalancingPolicy {
//
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
std::string name,
XdsApi::CdsUpdate update)
: parent_(std::move(parent)), update_(std::move(update)), type_(kUpdate) {
: parent_(std::move(parent)),
name_(std::move(name)),
update_(std::move(update)),
type_(kUpdate) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
grpc_error* error)
: parent_(std::move(parent)), type_(kError) {
std::string name, grpc_error* error)
: parent_(std::move(parent)), name_(std::move(name)), type_(kError) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
}
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent)
: parent_(std::move(parent)), type_(kDoesNotExist) {
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
std::string name)
: parent_(std::move(parent)), name_(std::move(name)), type_(kDoesNotExist) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
@ -185,13 +213,13 @@ void CdsLb::ClusterWatcher::Notifier::RunInExecCtx(void* arg,
void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer(grpc_error* error) {
switch (type_) {
case kUpdate:
parent_->OnClusterChanged(std::move(update_));
parent_->OnClusterChanged(name_, std::move(update_));
break;
case kError:
parent_->OnError(error);
parent_->OnError(name_, error);
break;
case kDoesNotExist:
parent_->OnResourceDoesNotExist();
parent_->OnResourceDoesNotExist(name_);
break;
};
delete this;
@ -261,13 +289,15 @@ void CdsLb::ShutdownLocked() {
shutting_down_ = true;
MaybeDestroyChildPolicyLocked();
if (xds_client_ != nullptr) {
if (cluster_watcher_ != nullptr) {
for (auto& watcher : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
config_->cluster().c_str());
watcher.first.c_str());
}
xds_client_->CancelClusterDataWatch(config_->cluster(), cluster_watcher_);
CancelClusterDataWatch(watcher.first, watcher.second.watcher,
/*delay_unsubscription=*/false);
}
watchers_.clear();
xds_client_.reset(DEBUG_LOCATION, "CdsLb");
}
grpc_channel_args_destroy(args_);
@ -301,119 +331,203 @@ void CdsLb::UpdateLocked(UpdateArgs args) {
// If cluster name changed, cancel watcher and restart.
if (old_config == nullptr || old_config->cluster() != config_->cluster()) {
if (old_config != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
old_config->cluster().c_str());
for (auto& watcher : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
watcher.first.c_str());
}
CancelClusterDataWatch(watcher.first, watcher.second.watcher,
/*delay_unsubscription=*/true);
}
xds_client_->CancelClusterDataWatch(old_config->cluster(),
cluster_watcher_,
/*delay_unsubscription=*/true);
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
config_->cluster().c_str());
watchers_.clear();
}
auto watcher = absl::make_unique<ClusterWatcher>(Ref());
cluster_watcher_ = watcher.get();
auto watcher = absl::make_unique<ClusterWatcher>(Ref(), config_->cluster());
watchers_[config_->cluster()].watcher = watcher.get();
xds_client_->WatchClusterData(config_->cluster(), std::move(watcher));
}
}
void CdsLb::OnClusterChanged(XdsApi::CdsUpdate cluster_data) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] received CDS update from xds client %p: %s",
this, xds_client_.get(), cluster_data.ToString().c_str());
// This method will attempt to generate one or multiple entries of discovery
// mechanism recursively:
// For cluster types EDS or LOGICAL_DNS, one discovery mechanism entry may be
// generated cluster name, type and other data from the CdsUpdate inserted into
// the entry and the entry appended to the array of entries.
// Note, discovery mechanism entry can be generated if an CdsUpdate is
// available; otherwise, just return false. For cluster type AGGREGATE,
// recursively call the method for each child cluster.
bool CdsLb::GenerateDiscoveryMechanismForCluster(
const std::string& name, Json::Array* discovery_mechanisms,
std::set<std::string>* clusters_needed) {
clusters_needed->insert(name);
auto& state = watchers_[name];
// Create a new watcher if needed.
if (state.watcher == nullptr) {
auto watcher = absl::make_unique<ClusterWatcher>(Ref(), name);
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
name.c_str());
}
state.watcher = watcher.get();
xds_client_->WatchClusterData(name, std::move(watcher));
return false;
}
grpc_error* error = GRPC_ERROR_NONE;
error = UpdateXdsCertificateProvider(cluster_data);
if (error != GRPC_ERROR_NONE) {
return OnError(error);
// Don't have the update we need yet.
if (!state.update.has_value()) return false;
// For AGGREGATE clusters, recursively expand to child clusters.
if (state.update->cluster_type == XdsApi::CdsUpdate::ClusterType::AGGREGATE) {
bool missing_cluster = false;
for (const std::string& child_name :
state.update->prioritized_cluster_names) {
if (!GenerateDiscoveryMechanismForCluster(
child_name, discovery_mechanisms, clusters_needed)) {
missing_cluster = true;
}
}
return !missing_cluster;
}
// Construct config for child policy.
Json::Object discovery_mechanism = {
{"clusterName", config_->cluster()},
{"max_concurrent_requests", cluster_data.max_concurrent_requests},
{"type", "EDS"},
std::string type;
switch (state.update->cluster_type) {
case XdsApi::CdsUpdate::ClusterType::EDS:
type = "EDS";
break;
case XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS:
type = "LOGICAL_DNS";
break;
default:
GPR_ASSERT(0);
break;
}
Json::Object mechanism = {
{"clusterName", name},
{"max_concurrent_requests", state.update->max_concurrent_requests},
{"type", std::move(type)},
};
if (!cluster_data.eds_service_name.empty()) {
discovery_mechanism["edsServiceName"] = cluster_data.eds_service_name;
if (!state.update->eds_service_name.empty()) {
mechanism["edsServiceName"] = state.update->eds_service_name;
}
if (cluster_data.lrs_load_reporting_server_name.has_value()) {
discovery_mechanism["lrsLoadReportingServerName"] =
cluster_data.lrs_load_reporting_server_name.value();
if (state.update->lrs_load_reporting_server_name.has_value()) {
mechanism["lrsLoadReportingServerName"] =
state.update->lrs_load_reporting_server_name.value();
}
Json::Object child_config = {
{"discoveryMechanisms",
Json::Array{
discovery_mechanism,
}},
{"localityPickingPolicy",
Json::Array{
Json::Object{
{"weighted_target_experimental",
Json::Object{
{"targets", Json::Object()},
}},
},
}},
{"endpointPickingPolicy",
Json::Array{
Json::Object{
{"round_robin", Json::Object()},
},
}},
};
Json json = Json::Array{
Json::Object{
{"xds_cluster_resolver_experimental", std::move(child_config)},
},
};
discovery_mechanisms->emplace_back(std::move(mechanism));
return true;
}
void CdsLb::OnClusterChanged(const std::string& name,
XdsApi::CdsUpdate cluster_data) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
std::string json_str = json.Dump(/*indent=*/1);
gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this,
json_str.c_str());
gpr_log(
GPR_INFO,
"[cdslb %p] received CDS update for cluster %s from xds client %p: %s",
this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str());
}
RefCountedPtr<LoadBalancingPolicy::Config> config =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
// Store the update in the map if we are still interested in watching this
// cluster (i.e., it is not cancelled already).
// If we've already deleted this entry, then this is an update notification
// that was scheduled before the deletion, so we can just ignore it.
auto it = watchers_.find(name);
if (it == watchers_.end()) return;
it->second.update = std::move(cluster_data);
// Take care of integration with new certificate code.
grpc_error* error = GRPC_ERROR_NONE;
error = UpdateXdsCertificateProvider(name, it->second.update.value());
if (error != GRPC_ERROR_NONE) {
OnError(error);
return;
return OnError(name, error);
}
// Create child policy if not already present.
if (child_policy_ == nullptr) {
LoadBalancingPolicy::Args args;
args.work_serializer = work_serializer();
args.args = args_;
args.channel_control_helper = absl::make_unique<Helper>(Ref());
child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
config->name(), std::move(args));
if (child_policy_ == nullptr) {
OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"failed to create child policy"));
// Scan the map starting from the root cluster to generate the list of
// discovery mechanisms. If we don't have some of the data we need (i.e., we
// just started up and not all watchers have returned data yet), then don't
// update the child policy at all.
Json::Array discovery_mechanisms;
std::set<std::string> clusters_needed;
if (GenerateDiscoveryMechanismForCluster(
config_->cluster(), &discovery_mechanisms, &clusters_needed)) {
// Construct config for child policy.
Json::Object child_config = {
{"localityPickingPolicy",
Json::Array{
Json::Object{
{"weighted_target_experimental",
Json::Object{
{"targets", Json::Object()},
}},
},
}},
{"endpointPickingPolicy",
Json::Array{
Json::Object{
{"round_robin", Json::Object()},
},
}},
{"discoveryMechanisms", std::move(discovery_mechanisms)},
};
Json json = Json::Array{
Json::Object{
{"xds_cluster_resolver_experimental", std::move(child_config)},
},
};
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
std::string json_str = json.Dump(/*indent=*/1);
gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s",
this, json_str.c_str());
}
RefCountedPtr<LoadBalancingPolicy::Config> config =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
if (error != GRPC_ERROR_NONE) {
OnError(name, error);
return;
}
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
interested_parties());
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
config->name(), child_policy_.get());
// Create child policy if not already present.
if (child_policy_ == nullptr) {
LoadBalancingPolicy::Args args;
args.work_serializer = work_serializer();
args.args = args_;
args.channel_control_helper = absl::make_unique<Helper>(Ref());
child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
config->name(), std::move(args));
if (child_policy_ == nullptr) {
OnError(name, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"failed to create child policy"));
return;
}
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
interested_parties());
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
config->name(), child_policy_.get());
}
}
// Update child policy.
UpdateArgs args;
args.config = std::move(config);
if (xds_certificate_provider_ != nullptr) {
grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1);
} else {
args.args = grpc_channel_args_copy(args_);
}
child_policy_->UpdateLocked(std::move(args));
}
// Update child policy.
UpdateArgs args;
args.config = std::move(config);
if (xds_certificate_provider_ != nullptr) {
grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1);
} else {
args.args = grpc_channel_args_copy(args_);
// Remove entries in watchers_ for any clusters not in clusters_needed
for (auto it = watchers_.begin(); it != watchers_.end();) {
const std::string& cluster_name = it->first;
if (clusters_needed.find(cluster_name) != clusters_needed.end()) {
++it;
continue;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
cluster_name.c_str());
}
CancelClusterDataWatch(cluster_name, it->second.watcher,
/*delay_unsubscription=*/false);
it = watchers_.erase(it);
}
child_policy_->UpdateLocked(std::move(args));
}
void CdsLb::OnError(grpc_error* error) {
void CdsLb::OnError(const std::string& name, grpc_error* error) {
gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s",
this, config_->cluster().c_str(), grpc_error_string(error));
this, name.c_str(), grpc_error_string(error));
// Go into TRANSIENT_FAILURE if we have not yet created the child
// policy (i.e., we have not yet received data from xds). Otherwise,
// we keep running with the data we had previously.
@ -426,11 +540,11 @@ void CdsLb::OnError(grpc_error* error) {
}
}
void CdsLb::OnResourceDoesNotExist() {
void CdsLb::OnResourceDoesNotExist(const std::string& name) {
gpr_log(GPR_ERROR,
"[cdslb %p] CDS resource for %s does not exist -- reporting "
"TRANSIENT_FAILURE",
this, config_->cluster().c_str());
this, name.c_str());
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("CDS resource \"", config_->cluster(),
@ -444,7 +558,7 @@ void CdsLb::OnResourceDoesNotExist() {
}
grpc_error* CdsLb::UpdateXdsCertificateProvider(
const XdsApi::CdsUpdate& cluster_data) {
const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data) {
// Early out if channel is not configured to use xds security.
grpc_channel_credentials* channel_credentials =
grpc_channel_credentials_find_in_args(args_);
@ -490,7 +604,7 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider(
root_certificate_provider_ = std::move(new_root_provider);
}
xds_certificate_provider_->UpdateRootCertNameAndDistributor(
config_->cluster(), root_provider_cert_name,
cluster_name, root_provider_cert_name,
root_certificate_provider_ == nullptr
? nullptr
: root_certificate_provider_->distributor());
@ -528,7 +642,7 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider(
identity_certificate_provider_ = std::move(new_identity_provider);
}
xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(
config_->cluster(), identity_provider_cert_name,
cluster_name, identity_provider_cert_name,
identity_certificate_provider_ == nullptr
? nullptr
: identity_certificate_provider_->distributor());
@ -537,10 +651,24 @@ grpc_error* CdsLb::UpdateXdsCertificateProvider(
cluster_data.common_tls_context.combined_validation_context
.default_validation_context.match_subject_alt_names;
xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(
config_->cluster(), match_subject_alt_names);
cluster_name, match_subject_alt_names);
return GRPC_ERROR_NONE;
}
void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name,
XdsClient::ClusterWatcherInterface* watcher,
bool delay_unsubscription) {
if (xds_certificate_provider_ != nullptr) {
std::string name(cluster_name);
xds_certificate_provider_->UpdateRootCertNameAndDistributor(name, "",
nullptr);
xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(name, "",
nullptr);
xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {});
}
xds_client_->CancelClusterDataWatch(cluster_name, watcher,
delay_unsubscription);
}
//
// factory
//
@ -575,6 +703,7 @@ class CdsLbFactory : public LoadBalancingPolicyFactory {
return nullptr;
}
std::vector<grpc_error*> error_list;
// cluster name.
std::string cluster;
auto it = json.object_value().find("cluster");
if (it == json.object_value().end()) {

@ -21,4 +21,9 @@
// Set by xds_cluster_impl LB policy and used by GoogleDefaultCredentials.
#define GRPC_ARG_XDS_CLUSTER_NAME "grpc.internal.xds_cluster_name"
// For testing purpose, this channel arg indicating xds_cluster_resolver LB
// policy should use the fake DNS resolver to resolve logical dns cluster.
#define GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR \
"grpc.internal.xds_logical_dns_cluster_fake_resolver_response_generator"
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_ARGS_H

@ -29,8 +29,10 @@
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/xds/xds_channel_args.h"
@ -469,11 +471,26 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
//
void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() {
std::string target = parent()->server_name_;
grpc_channel_args* args = nullptr;
FakeResolverResponseGenerator* fake_resolver_response_generator =
grpc_channel_args_find_pointer<FakeResolverResponseGenerator>(
parent()->args_,
GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR);
if (fake_resolver_response_generator != nullptr) {
target = absl::StrCat("fake:", target);
grpc_arg new_arg = FakeResolverResponseGenerator::MakeChannelArg(
fake_resolver_response_generator);
args = grpc_channel_args_copy_and_add(parent()->args_, &new_arg, 1);
} else {
args = grpc_channel_args_copy(parent()->args_);
}
resolver_ = ResolverRegistry::CreateResolver(
parent()->server_name_.c_str(), parent()->args_,
grpc_pollset_set_create(), parent()->work_serializer(),
target.c_str(), args, parent()->interested_parties(),
parent()->work_serializer(),
absl::make_unique<ResolverResultHandler>(
Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism")));
grpc_channel_args_destroy(args);
if (resolver_ == nullptr) {
parent()->OnResourceDoesNotExist(index());
return;
@ -509,9 +526,11 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
XdsApi::EdsUpdate update;
XdsApi::EdsUpdate::Priority::Locality locality;
locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
locality.lb_weight = 1;
locality.endpoints = std::move(result.addresses);
update.priorities[0].localities.emplace(locality.name.get(),
std::move(locality));
XdsApi::EdsUpdate::Priority priority;
priority.localities.emplace(locality.name.get(), std::move(locality));
update.priorities.emplace_back(std::move(priority));
discovery_mechanism_->parent()->OnEndpointChanged(
discovery_mechanism_->index(), std::move(update));
}
@ -825,23 +844,13 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
Json::Object priority_children;
Json::Array priority_priorities;
// Setting up index to iterate through the discovery mechanisms and keeping
// track the discovery_mechanism each prioirty belongs to.
// track the discovery_mechanism each priority belongs to.
size_t discovery_index = 0;
// Setting up num_priorities_remaining to track the priorities in each
// discovery_mechanism.
size_t num_priorities_remaining_in_discovery =
discovery_mechanisms_[discovery_index].num_priorities;
for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
// Each prioirty in the priority_list_ should correspond to a priority in a
// discovery mechanism in discovery_mechanisms_ (both in the same order).
// Keeping track of the discovery_mechanism each prioirty belongs to.
if (num_priorities_remaining_in_discovery == 0) {
++discovery_index;
num_priorities_remaining_in_discovery =
discovery_mechanisms_[discovery_index].num_priorities;
} else {
--num_priorities_remaining_in_discovery;
}
const auto& localities = priority_list_[priority].localities;
Json::Object weighted_targets;
for (const auto& p : localities) {
@ -913,6 +922,16 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
{"config", std::move(locality_picking_policy)},
{"ignore_reresolution_requests", true},
};
// Each priority in the priority_list_ should correspond to a priority in a
// discovery mechanism in discovery_mechanisms_ (both in the same order).
// Keeping track of the discovery_mechanism each priority belongs to.
--num_priorities_remaining_in_discovery;
while (num_priorities_remaining_in_discovery == 0 &&
discovery_index < discovery_mechanisms_.size() - 1) {
++discovery_index;
num_priorities_remaining_in_discovery =
discovery_mechanisms_[discovery_index].num_priorities;
}
}
// There should be matching number of priorities in discovery_mechanisms_ and
// in priority_list_; therefore at the end of looping through all the

@ -0,0 +1,29 @@
/* This file was generated by upbc (the upb compiler) from the input
* file:
*
* envoy/extensions/clusters/aggregate/v3/cluster.proto
*
* Do not edit -- your changes will be discarded when the file is
* regenerated. */
#include <stddef.h>
#include "upb/msg.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h"
#include "udpa/annotations/status.upb.h"
#include "udpa/annotations/versioning.upb.h"
#include "validate/validate.upb.h"
#include "upb/port_def.inc"
static const upb_msglayout_field envoy_extensions_clusters_aggregate_v3_ClusterConfig__fields[1] = {
{1, UPB_SIZE(0, 0), 0, 0, 9, 3},
};
const upb_msglayout envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit = {
NULL,
&envoy_extensions_clusters_aggregate_v3_ClusterConfig__fields[0],
UPB_SIZE(8, 8), 1, false, 255,
};
#include "upb/port_undef.inc"

@ -0,0 +1,67 @@
/* This file was generated by upbc (the upb compiler) from the input
* file:
*
* envoy/extensions/clusters/aggregate/v3/cluster.proto
*
* Do not edit -- your changes will be discarded when the file is
* regenerated. */
#ifndef ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPB_H_
#define ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPB_H_
#include "upb/msg.h"
#include "upb/decode.h"
#include "upb/decode_fast.h"
#include "upb/encode.h"
#include "upb/port_def.inc"
#ifdef __cplusplus
extern "C" {
#endif
struct envoy_extensions_clusters_aggregate_v3_ClusterConfig;
typedef struct envoy_extensions_clusters_aggregate_v3_ClusterConfig envoy_extensions_clusters_aggregate_v3_ClusterConfig;
extern const upb_msglayout envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit;
/* envoy.extensions.clusters.aggregate.v3.ClusterConfig */
UPB_INLINE envoy_extensions_clusters_aggregate_v3_ClusterConfig *envoy_extensions_clusters_aggregate_v3_ClusterConfig_new(upb_arena *arena) {
return (envoy_extensions_clusters_aggregate_v3_ClusterConfig *)_upb_msg_new(&envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit, arena);
}
UPB_INLINE envoy_extensions_clusters_aggregate_v3_ClusterConfig *envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse(const char *buf, size_t size,
upb_arena *arena) {
envoy_extensions_clusters_aggregate_v3_ClusterConfig *ret = envoy_extensions_clusters_aggregate_v3_ClusterConfig_new(arena);
return (ret && upb_decode(buf, size, ret, &envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit, arena)) ? ret : NULL;
}
UPB_INLINE envoy_extensions_clusters_aggregate_v3_ClusterConfig *envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse_ex(const char *buf, size_t size,
upb_arena *arena, int options) {
envoy_extensions_clusters_aggregate_v3_ClusterConfig *ret = envoy_extensions_clusters_aggregate_v3_ClusterConfig_new(arena);
return (ret && _upb_decode(buf, size, ret, &envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit, arena, options))
? ret : NULL;
}
UPB_INLINE char *envoy_extensions_clusters_aggregate_v3_ClusterConfig_serialize(const envoy_extensions_clusters_aggregate_v3_ClusterConfig *msg, upb_arena *arena, size_t *len) {
return upb_encode(msg, &envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit, arena, len);
}
UPB_INLINE upb_strview const* envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters(const envoy_extensions_clusters_aggregate_v3_ClusterConfig *msg, size_t *len) { return (upb_strview const*)_upb_array_accessor(msg, UPB_SIZE(0, 0), len); }
UPB_INLINE upb_strview* envoy_extensions_clusters_aggregate_v3_ClusterConfig_mutable_clusters(envoy_extensions_clusters_aggregate_v3_ClusterConfig *msg, size_t *len) {
return (upb_strview*)_upb_array_mutable_accessor(msg, UPB_SIZE(0, 0), len);
}
UPB_INLINE upb_strview* envoy_extensions_clusters_aggregate_v3_ClusterConfig_resize_clusters(envoy_extensions_clusters_aggregate_v3_ClusterConfig *msg, size_t len, upb_arena *arena) {
return (upb_strview*)_upb_array_resize_accessor2(msg, UPB_SIZE(0, 0), len, UPB_SIZE(3, 4), arena);
}
UPB_INLINE bool envoy_extensions_clusters_aggregate_v3_ClusterConfig_add_clusters(envoy_extensions_clusters_aggregate_v3_ClusterConfig *msg, upb_strview val, upb_arena *arena) {
return _upb_array_append_accessor2(msg, UPB_SIZE(0, 0), UPB_SIZE(3, 4), &val,
arena);
}
#ifdef __cplusplus
} /* extern "C" */
#endif
#include "upb/port_undef.inc"
#endif /* ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPB_H_ */

@ -0,0 +1,51 @@
/* This file was generated by upbc (the upb compiler) from the input
* file:
*
* envoy/extensions/clusters/aggregate/v3/cluster.proto
*
* Do not edit -- your changes will be discarded when the file is
* regenerated. */
#include "upb/def.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h"
extern upb_def_init udpa_annotations_status_proto_upbdefinit;
extern upb_def_init udpa_annotations_versioning_proto_upbdefinit;
extern upb_def_init validate_validate_proto_upbdefinit;
extern const upb_msglayout envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit;
static const upb_msglayout *layouts[1] = {
&envoy_extensions_clusters_aggregate_v3_ClusterConfig_msginit,
};
static const char descriptor[389] = {'\n', '4', 'e', 'n', 'v', 'o', 'y', '/', 'e', 'x', 't', 'e', 'n', 's', 'i', 'o', 'n', 's', '/', 'c', 'l', 'u', 's', 't', 'e',
'r', 's', '/', 'a', 'g', 'g', 'r', 'e', 'g', 'a', 't', 'e', '/', 'v', '3', '/', 'c', 'l', 'u', 's', 't', 'e', 'r', '.', 'p',
'r', 'o', 't', 'o', '\022', '&', 'e', 'n', 'v', 'o', 'y', '.', 'e', 'x', 't', 'e', 'n', 's', 'i', 'o', 'n', 's', '.', 'c', 'l',
'u', 's', 't', 'e', 'r', 's', '.', 'a', 'g', 'g', 'r', 'e', 'g', 'a', 't', 'e', '.', 'v', '3', '\032', '\035', 'u', 'd', 'p', 'a',
'/', 'a', 'n', 'n', 'o', 't', 'a', 't', 'i', 'o', 'n', 's', '/', 's', 't', 'a', 't', 'u', 's', '.', 'p', 'r', 'o', 't', 'o',
'\032', '!', 'u', 'd', 'p', 'a', '/', 'a', 'n', 'n', 'o', 't', 'a', 't', 'i', 'o', 'n', 's', '/', 'v', 'e', 'r', 's', 'i', 'o',
'n', 'i', 'n', 'g', '.', 'p', 'r', 'o', 't', 'o', '\032', '\027', 'v', 'a', 'l', 'i', 'd', 'a', 't', 'e', '/', 'v', 'a', 'l', 'i',
'd', 'a', 't', 'e', '.', 'p', 'r', 'o', 't', 'o', '\"', 'r', '\n', '\r', 'C', 'l', 'u', 's', 't', 'e', 'r', 'C', 'o', 'n', 'f',
'i', 'g', '\022', '$', '\n', '\010', 'c', 'l', 'u', 's', 't', 'e', 'r', 's', '\030', '\001', ' ', '\003', '(', '\t', 'B', '\010', '\372', 'B', '\005',
'\222', '\001', '\002', '\010', '\001', 'R', '\010', 'c', 'l', 'u', 's', 't', 'e', 'r', 's', ':', ';', '\232', '\305', '\210', '\036', '6', '\n', '4', 'e',
'n', 'v', 'o', 'y', '.', 'c', 'o', 'n', 'f', 'i', 'g', '.', 'c', 'l', 'u', 's', 't', 'e', 'r', '.', 'a', 'g', 'g', 'r', 'e',
'g', 'a', 't', 'e', '.', 'v', '2', 'a', 'l', 'p', 'h', 'a', '.', 'C', 'l', 'u', 's', 't', 'e', 'r', 'C', 'o', 'n', 'f', 'i',
'g', 'B', 'N', '\n', '4', 'i', 'o', '.', 'e', 'n', 'v', 'o', 'y', 'p', 'r', 'o', 'x', 'y', '.', 'e', 'n', 'v', 'o', 'y', '.',
'e', 'x', 't', 'e', 'n', 's', 'i', 'o', 'n', 's', '.', 'c', 'l', 'u', 's', 't', 'e', 'r', 's', '.', 'a', 'g', 'g', 'r', 'e',
'g', 'a', 't', 'e', '.', 'v', '3', 'B', '\014', 'C', 'l', 'u', 's', 't', 'e', 'r', 'P', 'r', 'o', 't', 'o', 'P', '\001', '\272', '\200',
'\310', '\321', '\006', '\002', '\020', '\002', 'b', '\006', 'p', 'r', 'o', 't', 'o', '3',
};
static upb_def_init *deps[4] = {
&udpa_annotations_status_proto_upbdefinit,
&udpa_annotations_versioning_proto_upbdefinit,
&validate_validate_proto_upbdefinit,
NULL
};
upb_def_init envoy_extensions_clusters_aggregate_v3_cluster_proto_upbdefinit = {
deps,
layouts,
"envoy/extensions/clusters/aggregate/v3/cluster.proto",
UPB_STRVIEW_INIT(descriptor, 389)
};

@ -0,0 +1,35 @@
/* This file was generated by upbc (the upb compiler) from the input
* file:
*
* envoy/extensions/clusters/aggregate/v3/cluster.proto
*
* Do not edit -- your changes will be discarded when the file is
* regenerated. */
#ifndef ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPBDEFS_H_
#define ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPBDEFS_H_
#include "upb/def.h"
#include "upb/port_def.inc"
#ifdef __cplusplus
extern "C" {
#endif
#include "upb/def.h"
#include "upb/port_def.inc"
extern upb_def_init envoy_extensions_clusters_aggregate_v3_cluster_proto_upbdefinit;
UPB_INLINE const upb_msgdef *envoy_extensions_clusters_aggregate_v3_ClusterConfig_getmsgdef(upb_symtab *s) {
_upb_symtab_loaddefinit(s, &envoy_extensions_clusters_aggregate_v3_cluster_proto_upbdefinit);
return upb_symtab_lookupmsg(s, "envoy.extensions.clusters.aggregate.v3.ClusterConfig");
}
#ifdef __cplusplus
} /* extern "C" */
#endif
#include "upb/port_undef.inc"
#endif /* ENVOY_EXTENSIONS_CLUSTERS_AGGREGATE_V3_CLUSTER_PROTO_UPBDEFS_H_ */

@ -62,6 +62,7 @@
#include "envoy/config/route/v3/route.upb.h"
#include "envoy/config/route/v3/route.upbdefs.h"
#include "envoy/config/route/v3/route_components.upb.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h"
#include "envoy/extensions/transport_sockets/tls/v3/common.upb.h"
#include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h"
@ -101,6 +102,19 @@ bool XdsTimeoutEnabled() {
return parse_succeeded && parsed_value;
}
// TODO (donnadionne): Check to see if cluster types aggregate_cluster and
// logical_dns are enabled, this will be
// removed once the cluster types are fully integration-tested and enabled by
// default.
bool XdsAggregateAndLogicalDnsClusterEnabled() {
char* value = gpr_getenv(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
gpr_free(value);
return parse_succeeded && parsed_value;
}
// TODO(yashykt): Check to see if xDS security is enabled. This will be
// removed once this feature is fully integration-tested and enabled by
// default.
@ -1608,29 +1622,79 @@ grpc_error* CdsResponseParse(
}
XdsApi::CdsUpdate& cds_update = (*cds_update_map)[std::move(cluster_name)];
// Check the cluster_discovery_type.
if (!envoy_config_cluster_v3_Cluster_has_type(cluster)) {
if (!envoy_config_cluster_v3_Cluster_has_type(cluster) &&
!envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found.");
}
if (envoy_config_cluster_v3_Cluster_type(cluster) !=
if (envoy_config_cluster_v3_Cluster_type(cluster) ==
envoy_config_cluster_v3_Cluster_EDS) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType is not EDS.");
}
// Check the EDS config source.
const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config =
envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster);
const envoy_config_core_v3_ConfigSource* eds_config =
envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config(
eds_cluster_config);
if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config)) {
cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::EDS;
// Check the EDS config source.
const envoy_config_cluster_v3_Cluster_EdsClusterConfig*
eds_cluster_config =
envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster);
const envoy_config_core_v3_ConfigSource* eds_config =
envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config(
eds_cluster_config);
if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"EDS ConfigSource is not ADS.");
}
// Record EDS service_name (if any).
upb_strview service_name =
envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name(
eds_cluster_config);
if (service_name.size != 0) {
cds_update.eds_service_name = UpbStringToStdString(service_name);
}
} else if (!XdsAggregateAndLogicalDnsClusterEnabled()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"EDS ConfigSource is not ADS.");
}
// Record EDS service_name (if any).
upb_strview service_name =
envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name(
eds_cluster_config);
if (service_name.size != 0) {
cds_update.eds_service_name = UpbStringToStdString(service_name);
"DiscoveryType is not valid.");
} else if (envoy_config_cluster_v3_Cluster_type(cluster) ==
envoy_config_cluster_v3_Cluster_LOGICAL_DNS) {
cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS;
} else {
if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) {
const envoy_config_cluster_v3_Cluster_CustomClusterType*
custom_cluster_type =
envoy_config_cluster_v3_Cluster_cluster_type(cluster);
upb_strview type_name =
envoy_config_cluster_v3_Cluster_CustomClusterType_name(
custom_cluster_type);
if (UpbStringToAbsl(type_name) == "envoy.clusters.aggregate") {
cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::AGGREGATE;
// Retrieve aggregate clusters.
const google_protobuf_Any* typed_config =
envoy_config_cluster_v3_Cluster_CustomClusterType_typed_config(
custom_cluster_type);
const upb_strview aggregate_cluster_config_upb_strview =
google_protobuf_Any_value(typed_config);
const envoy_extensions_clusters_aggregate_v3_ClusterConfig*
aggregate_cluster_config =
envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse(
aggregate_cluster_config_upb_strview.data,
aggregate_cluster_config_upb_strview.size, arena);
if (aggregate_cluster_config == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Can't parse aggregate cluster.");
}
size_t size;
const upb_strview* clusters =
envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters(
aggregate_cluster_config, &size);
for (size_t i = 0; i < size; ++i) {
const upb_strview cluster = clusters[i];
cds_update.prioritized_cluster_names.emplace_back(
UpbStringToStdString(cluster));
}
} else {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"DiscoveryType is not valid.");
}
} else {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"DiscoveryType is not valid.");
}
}
// Check the LB policy.
if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) !=

@ -229,6 +229,9 @@ class XdsApi {
using RdsUpdateMap = std::map<std::string /*route_config_name*/, RdsUpdate>;
struct CdsUpdate {
enum ClusterType { EDS, LOGICAL_DNS, AGGREGATE };
ClusterType cluster_type;
// For cluster type EDS.
// The name to use in the EDS request.
// If empty, the cluster name will be used.
std::string eds_service_name;
@ -242,12 +245,17 @@ class XdsApi {
// Maximum number of outstanding requests can be made to the upstream
// cluster.
uint32_t max_concurrent_requests = 1024;
// For cluster type AGGREGATE.
// The prioritized list of cluster names.
std::vector<std::string> prioritized_cluster_names;
bool operator==(const CdsUpdate& other) const {
return eds_service_name == other.eds_service_name &&
return cluster_type == other.cluster_type &&
eds_service_name == other.eds_service_name &&
common_tls_context == other.common_tls_context &&
lrs_load_reporting_server_name ==
other.lrs_load_reporting_server_name &&
prioritized_cluster_names == other.prioritized_cluster_names &&
max_concurrent_requests == other.max_concurrent_requests;
}

@ -72,6 +72,15 @@ message CircuitBreakers {
repeated Thresholds thresholds = 1;
}
message ClusterConfig {
repeated string clusters = 1;
}
message CustomClusterType {
string name = 1;
ClusterConfig typed_config = 2;
}
message Cluster {
// Refer to :ref:`service discovery type <arch_overview_service_discovery_types>`
// for an explanation on each type.
@ -106,6 +115,9 @@ message Cluster {
// The :ref:`service discovery type <arch_overview_service_discovery_types>`
// to use for resolving the cluster.
DiscoveryType type = 2;
// The custom cluster type: aggregate cluster in this case.
CustomClusterType cluster_type = 38;
}
// Only valid when discovery type is EDS.

@ -204,6 +204,17 @@ grpc_proto_library(
],
)
grpc_proto_library(
name = "aggregate_cluster_proto",
srcs = [
"aggregate_cluster.proto",
],
well_known_protos = True,
deps = [
"string_proto",
],
)
grpc_proto_library(
name = "tls_proto",
srcs = [

@ -0,0 +1,30 @@
// Copyright 2020 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Local copy of Envoy xDS proto file, used for testing only.
syntax = "proto3";
package envoy.extensions.clusters.aggregate.v3;
import "src/proto/grpc/testing/xds/v3/string.proto";
// Configuration for the aggregate cluster. See the :ref:`architecture overview
// <arch_overview_aggregate_cluster>` for more information.
// [#extension: envoy.clusters.aggregate]
message ClusterConfig {
// Load balancing clusters in aggregate cluster. Clusters are prioritized based on the order they
// appear in this list.
repeated string clusters = 1;
}

@ -21,6 +21,7 @@ package envoy.config.cluster.v3;
import "src/proto/grpc/testing/xds/v3/base.proto";
import "src/proto/grpc/testing/xds/v3/config_source.proto";
import "google/protobuf/any.proto";
import "google/protobuf/wrappers.proto";
enum RoutingPriority {
@ -36,6 +37,16 @@ message CircuitBreakers {
repeated Thresholds thresholds = 1;
}
// Extended cluster type.
message CustomClusterType {
// The type of the cluster to instantiate. The name must match a supported cluster type.
string name = 1;
// Cluster specific configuration which depends on the cluster being instantiated.
// See the supported cluster for further documentation.
google.protobuf.Any typed_config = 2;
}
// [#protodoc-title: Cluster configuration]
// Configuration for a single upstream cluster.
@ -134,6 +145,9 @@ message Cluster {
// The :ref:`service discovery type <arch_overview_service_discovery_types>`
// to use for resolving the cluster.
DiscoveryType type = 2;
// The custom cluster type.
CustomClusterType cluster_type = 38;
}
// Configuration to use for EDS updates for the Cluster.

@ -151,6 +151,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/upb-generated/envoy/config/route/v3/route_components.upb.c',
'src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c',
'src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c',
'src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c',
'src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c',
'src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/common.upb.c',
@ -237,6 +238,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/upbdefs-generated/envoy/config/route/v3/route_components.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c',
'src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/common.upbdefs.c',

@ -536,6 +536,7 @@ grpc_cc_test(
"//src/proto/grpc/testing/xds:lds_rds_for_test_proto",
"//src/proto/grpc/testing/xds:lrs_for_test_proto",
"//src/proto/grpc/testing/xds/v3:ads_proto",
"//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto",
"//src/proto/grpc/testing/xds/v3:cluster_proto",
"//src/proto/grpc/testing/xds/v3:discovery_proto",
"//src/proto/grpc/testing/xds/v3:endpoint_proto",

@ -49,6 +49,7 @@
#include <grpcpp/xds_server_builder.h>
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/xds/certificate_provider_registry.h"
@ -81,6 +82,7 @@
#include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h"
@ -98,11 +100,13 @@ using std::chrono::system_clock;
using ::envoy::config::cluster::v3::CircuitBreakers;
using ::envoy::config::cluster::v3::Cluster;
using ::envoy::config::cluster::v3::CustomClusterType;
using ::envoy::config::cluster::v3::RoutingPriority;
using ::envoy::config::endpoint::v3::ClusterLoadAssignment;
using ::envoy::config::endpoint::v3::HealthStatus;
using ::envoy::config::listener::v3::Listener;
using ::envoy::config::route::v3::RouteConfiguration;
using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig;
using ::envoy::extensions::filters::network::http_connection_manager::v3::
HttpConnectionManager;
using ::envoy::extensions::transport_sockets::tls::v3::DownstreamTlsContext;
@ -1498,6 +1502,28 @@ std::shared_ptr<ChannelCredentials> CreateTlsFallbackCredentials() {
return channel_creds;
}
namespace {
void* response_generator_arg_copy(void* p) {
auto* generator = static_cast<grpc_core::FakeResolverResponseGenerator*>(p);
generator->Ref().release();
return p;
}
void response_generator_arg_destroy(void* p) {
auto* generator = static_cast<grpc_core::FakeResolverResponseGenerator*>(p);
generator->Unref();
}
int response_generator_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
const grpc_arg_pointer_vtable
kLogicalDnsClusterResolverResponseGeneratorVtable = {
response_generator_arg_copy, response_generator_arg_destroy,
response_generator_cmp};
} // namespace
class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
protected:
XdsEnd2endTest(size_t num_backends, size_t num_balancers,
@ -1535,6 +1561,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
xds_channel_args_to_add_.emplace_back(
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
lb_channel_response_generator_.get()));
// Inject xDS logical cluster resolver response generator.
logical_dns_cluster_resolver_response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
if (xds_resource_does_not_exist_timeout_ms_ > 0) {
xds_channel_args_to_add_.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS),
@ -1640,6 +1669,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator);
}
args.SetPointerWithVtable(
GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR,
logical_dns_cluster_resolver_response_generator_.get(),
&kLogicalDnsClusterResolverResponseGeneratorVtable);
std::string uri = absl::StrCat(
GetParam().use_xds_resolver() ? "xds" : "fake", ":///", server_name);
std::shared_ptr<ChannelCredentials> channel_creds =
@ -2270,6 +2303,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
response_generator_;
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
lb_channel_response_generator_;
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
logical_dns_cluster_resolver_response_generator_;
int xds_resource_does_not_exist_timeout_ms_ = 0;
absl::InlinedVector<grpc_arg, 2> xds_channel_args_to_add_;
grpc_channel_args xds_channel_args_;
@ -5300,9 +5335,172 @@ TEST_P(CdsTest, Vanilla) {
AdsServiceImpl::ResponseState::ACKED);
}
TEST_P(CdsTest, LogicalDNSClusterType) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER",
"true");
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Create Logical DNS Cluster
auto cluster = default_cluster_;
cluster.set_type(Cluster::LOGICAL_DNS);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Set Logical DNS result
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts(1, 2));
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Wait for traffic to go to backend 1.
WaitForBackend(1);
gpr_unsetenv(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
}
TEST_P(CdsTest, AggregateClusterType) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER",
"true");
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewEdsService1Name = "new_eds_service_name_1";
const char* kNewCluster2Name = "new_cluster_2";
const char* kNewEdsService2Name = "new_eds_service_name_2";
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args1({
{"locality0", GetBackendPorts(1, 2)},
});
AdsServiceImpl::EdsResourceArgs args2({
{"locality0", GetBackendPorts(2, 3)},
});
balancers_[0]->ads_service()->SetEdsResource(
BuildEdsResource(args1, kNewEdsService1Name));
balancers_[0]->ads_service()->SetEdsResource(
BuildEdsResource(args2, kNewEdsService2Name));
// Populate new CDS resources.
Cluster new_cluster1 = default_cluster_;
new_cluster1.set_name(kNewCluster1Name);
new_cluster1.mutable_eds_cluster_config()->set_service_name(
kNewEdsService1Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster1);
Cluster new_cluster2 = default_cluster_;
new_cluster2.set_name(kNewCluster2Name);
new_cluster2.mutable_eds_cluster_config()->set_service_name(
kNewEdsService2Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster2);
// Create Aggregate Cluster
auto cluster = default_cluster_;
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kNewCluster1Name);
cluster_config.add_clusters(kNewCluster2Name);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Wait for traffic to go to backend 1.
WaitForBackend(1);
// Shutdown backend 1 and wait for all traffic to go to backend 2.
ShutdownBackend(1);
WaitForBackend(2);
EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state().state,
AdsServiceImpl::ResponseState::ACKED);
gpr_unsetenv(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
}
TEST_P(CdsTest, AggregateClusterMixedType) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER",
"true");
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const char* kNewCluster2Name = "new_cluster_2";
const char* kNewEdsService2Name = "new_eds_service_name_2";
const char* kLogicalDNSClusterName = "logical_dns_cluster";
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args2({
{"locality0", GetBackendPorts(2, 3)},
});
balancers_[0]->ads_service()->SetEdsResource(
BuildEdsResource(args2, kNewEdsService2Name));
// Populate new CDS resources.
Cluster new_cluster2 = default_cluster_;
new_cluster2.set_name(kNewCluster2Name);
new_cluster2.mutable_eds_cluster_config()->set_service_name(
kNewEdsService2Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster2);
// Create Logical DNS Cluster
auto logical_dns_cluster = default_cluster_;
logical_dns_cluster.set_name(kLogicalDNSClusterName);
logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
balancers_[0]->ads_service()->SetCdsResource(logical_dns_cluster);
// Create Aggregate Cluster
auto cluster = default_cluster_;
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kLogicalDNSClusterName);
cluster_config.add_clusters(kNewCluster2Name);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Set Logical DNS result
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts(1, 2));
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Wait for traffic to go to backend 1.
WaitForBackend(1);
// Shutdown backend 1 and wait for all traffic to go to backend 2.
ShutdownBackend(1);
WaitForBackend(2);
EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state().state,
AdsServiceImpl::ResponseState::ACKED);
gpr_unsetenv(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
}
// Test that CDS client should send a NACK if cluster type is Logical DNS but
// the feature is not yet supported.
TEST_P(CdsTest, LogicalDNSClusterTypeDisabled) {
auto cluster = default_cluster_;
cluster.set_type(Cluster::LOGICAL_DNS);
balancers_[0]->ads_service()->SetCdsResource(cluster);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
const auto& response_state =
balancers_[0]->ads_service()->cds_response_state();
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_EQ(response_state.error_message, "DiscoveryType is not valid.");
}
// Test that CDS client should send a NACK if cluster type is AGGREGATE but
// the feature is not yet supported.
TEST_P(CdsTest, AggregateClusterTypeDisabled) {
auto cluster = default_cluster_;
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters("cluster1");
cluster_config.add_clusters("cluster2");
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
cluster.set_type(Cluster::LOGICAL_DNS);
balancers_[0]->ads_service()->SetCdsResource(cluster);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
const auto& response_state =
balancers_[0]->ads_service()->cds_response_state();
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_EQ(response_state.error_message, "DiscoveryType is not valid.");
}
// Tests that CDS client should send a NACK if the cluster type in CDS response
// is other than EDS.
TEST_P(CdsTest, WrongClusterType) {
// is unsupported.
TEST_P(CdsTest, UnsupportedClusterType) {
auto cluster = default_cluster_;
cluster.set_type(Cluster::STATIC);
balancers_[0]->ads_service()->SetCdsResource(cluster);
@ -5312,7 +5510,7 @@ TEST_P(CdsTest, WrongClusterType) {
const auto& response_state =
balancers_[0]->ads_service()->cds_response_state();
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_EQ(response_state.error_message, "DiscoveryType is not EDS.");
EXPECT_EQ(response_state.error_message, "DiscoveryType is not valid.");
}
// Tests that CDS client should send a NACK if the eds_config in CDS response is

@ -75,6 +75,7 @@ proto_files=( \
"envoy/config/route/v3/route_components.proto" \
"envoy/config/route/v3/scoped_route.proto" \
"envoy/config/trace/v3/http_tracer.proto" \
"envoy/extensions/clusters/aggregate/v3/cluster.proto" \
"envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto" \
"envoy/extensions/transport_sockets/tls/v3/cert.proto" \
"envoy/extensions/transport_sockets/tls/v3/common.proto" \

@ -1289,6 +1289,8 @@ src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c \
src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h \
src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c \
src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h \
src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c \
src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h \
src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c \
src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h \
src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c \
@ -1459,6 +1461,8 @@ src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h \
src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h \
src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h \
src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h \
src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c \

@ -1125,6 +1125,8 @@ src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.c \
src/core/ext/upb-generated/envoy/config/route/v3/scoped_route.upb.h \
src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.c \
src/core/ext/upb-generated/envoy/config/trace/v3/http_tracer.upb.h \
src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.c \
src/core/ext/upb-generated/envoy/extensions/clusters/aggregate/v3/cluster.upb.h \
src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.c \
src/core/ext/upb-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h \
src/core/ext/upb-generated/envoy/extensions/transport_sockets/tls/v3/cert.upb.c \
@ -1295,6 +1297,8 @@ src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/config/route/v3/scoped_route.upbdefs.h \
src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/config/trace/v3/http_tracer.upbdefs.h \
src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h \
src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.c \
src/core/ext/upbdefs-generated/envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h \
src/core/ext/upbdefs-generated/envoy/extensions/transport_sockets/tls/v3/cert.upbdefs.c \

Loading…
Cancel
Save