Support RDS updates on the server (#27851)

* Port changes from #27388

* Reviewer comments

* Fix resource timeout issue

* Cleanup

* Fix clang-tidy

* Revert benchmark

* Restructure

* clang-tidy

* Automated change: Fix sanity tests

* Partial commit

* Reviewer comments

* Fixes

* Reviewer comments

* Reviewer comments

* Reviewer comments

* Reviewer comments

* clang-format

* Fix FaultInjection tests

* clang-tidy

Co-authored-by: yashykt <yashykt@users.noreply.github.com>
pull/28144/head^2
Yash Tibrewal 3 years ago committed by GitHub
parent 11f440775b
commit 25446c468a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      BUILD
  2. 4
      CMakeLists.txt
  3. 6
      Makefile
  4. 10
      build_autogenerated.yaml
  5. 4
      config.m4
  6. 4
      config.w32
  7. 6
      gRPC-C++.podspec
  8. 9
      gRPC-Core.podspec
  9. 6
      grpc.gemspec
  10. 3
      grpc.gyp
  11. 6
      package.xml
  12. 363
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  13. 102
      src/core/ext/xds/xds_api.cc
  14. 3
      src/core/ext/xds/xds_api.h
  15. 247
      src/core/ext/xds/xds_routing.cc
  16. 98
      src/core/ext/xds/xds_routing.h
  17. 1231
      src/core/ext/xds/xds_server_config_fetcher.cc
  18. 4
      src/core/lib/surface/server.h
  19. 3
      src/python/grpcio/grpc_core_dependencies.py
  20. 160
      test/cpp/end2end/xds/xds_end2end_test.cc
  21. 6
      tools/doxygen/Doxyfile.c++.internal
  22. 6
      tools/doxygen/Doxyfile.core.internal

@ -2556,6 +2556,7 @@ grpc_cc_library(
"src/core/ext/xds/xds_client_stats.cc",
"src/core/ext/xds/xds_http_fault_filter.cc",
"src/core/ext/xds/xds_http_filters.cc",
"src/core/ext/xds/xds_routing.cc",
"src/core/lib/security/credentials/xds/xds_credentials.cc",
],
hdrs = [
@ -2571,6 +2572,7 @@ grpc_cc_library(
"src/core/ext/xds/xds_client_stats.h",
"src/core/ext/xds/xds_http_fault_filter.h",
"src/core/ext/xds/xds_http_filters.h",
"src/core/ext/xds/xds_routing.h",
"src/core/lib/security/credentials/xds/xds_credentials.h",
],
external_deps = [
@ -2644,8 +2646,11 @@ grpc_cc_library(
deps = [
"gpr_base",
"grpc_base",
"grpc_server_config_selector",
"grpc_server_config_selector_filter",
"grpc_xds_channel_stack_modifier",
"grpc_xds_client",
"slice_refcount",
],
)

4
CMakeLists.txt generated

@ -1568,6 +1568,8 @@ add_library(grpc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/max_age/max_age_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/filters/server_config_selector/server_config_selector.cc
src/core/ext/filters/server_config_selector/server_config_selector_filter.cc
src/core/ext/service_config/service_config.cc
src/core/ext/service_config/service_config_parser.cc
src/core/ext/transport/chttp2/alpn/alpn.cc
@ -1803,6 +1805,7 @@ add_library(grpc
src/core/ext/xds/xds_client_stats.cc
src/core/ext/xds/xds_http_fault_filter.cc
src/core/ext/xds/xds_http_filters.cc
src/core/ext/xds/xds_routing.cc
src/core/ext/xds/xds_server_config_fetcher.cc
src/core/lib/address_utils/parse_address.cc
src/core/lib/address_utils/sockaddr_utils.cc
@ -14764,7 +14767,6 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(server_config_selector_test
src/core/ext/filters/server_config_selector/server_config_selector.cc
test/core/server_config_selector/server_config_selector_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc

6
Makefile generated

@ -1119,6 +1119,8 @@ LIBGRPC_SRC = \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/server_config_selector/server_config_selector.cc \
src/core/ext/filters/server_config_selector/server_config_selector_filter.cc \
src/core/ext/service_config/service_config.cc \
src/core/ext/service_config/service_config_parser.cc \
src/core/ext/transport/chttp2/alpn/alpn.cc \
@ -1354,6 +1356,7 @@ LIBGRPC_SRC = \
src/core/ext/xds/xds_client_stats.cc \
src/core/ext/xds/xds_http_fault_filter.cc \
src/core/ext/xds/xds_http_filters.cc \
src/core/ext/xds/xds_routing.cc \
src/core/ext/xds/xds_server_config_fetcher.cc \
src/core/lib/address_utils/parse_address.cc \
src/core/lib/address_utils/sockaddr_utils.cc \
@ -2727,6 +2730,8 @@ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPEN
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP)
src/core/ext/filters/server_config_selector/server_config_selector.cc: $(OPENSSL_DEP)
src/core/ext/filters/server_config_selector/server_config_selector_filter.cc: $(OPENSSL_DEP)
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc: $(OPENSSL_DEP)
src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc: $(OPENSSL_DEP)
src/core/ext/upb-generated/envoy/admin/v3/config_dump.upb.c: $(OPENSSL_DEP)
@ -2923,6 +2928,7 @@ src/core/ext/xds/xds_client.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_client_stats.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_http_fault_filter.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_http_filters.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_routing.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_server_config_fetcher.cc: $(OPENSSL_DEP)
src/core/lib/http/httpcli_security_connector.cc: $(OPENSSL_DEP)
src/core/lib/matchers/matchers.cc: $(OPENSSL_DEP)

@ -491,6 +491,8 @@ libs:
- src/core/ext/filters/http/server/http_server_filter.h
- src/core/ext/filters/max_age/max_age_filter.h
- src/core/ext/filters/message_size/message_size_filter.h
- src/core/ext/filters/server_config_selector/server_config_selector.h
- src/core/ext/filters/server_config_selector/server_config_selector_filter.h
- src/core/ext/service_config/service_config.h
- src/core/ext/service_config/service_config_call_data.h
- src/core/ext/service_config/service_config_parser.h
@ -723,6 +725,7 @@ libs:
- src/core/ext/xds/xds_client_stats.h
- src/core/ext/xds/xds_http_fault_filter.h
- src/core/ext/xds/xds_http_filters.h
- src/core/ext/xds/xds_routing.h
- src/core/lib/address_utils/parse_address.h
- src/core/lib/address_utils/sockaddr_utils.h
- src/core/lib/avl/avl.h
@ -1053,6 +1056,8 @@ libs:
- src/core/ext/filters/http/server/http_server_filter.cc
- src/core/ext/filters/max_age/max_age_filter.cc
- src/core/ext/filters/message_size/message_size_filter.cc
- src/core/ext/filters/server_config_selector/server_config_selector.cc
- src/core/ext/filters/server_config_selector/server_config_selector_filter.cc
- src/core/ext/service_config/service_config.cc
- src/core/ext/service_config/service_config_parser.cc
- src/core/ext/transport/chttp2/alpn/alpn.cc
@ -1288,6 +1293,7 @@ libs:
- src/core/ext/xds/xds_client_stats.cc
- src/core/ext/xds/xds_http_fault_filter.cc
- src/core/ext/xds/xds_http_filters.cc
- src/core/ext/xds/xds_routing.cc
- src/core/ext/xds/xds_server_config_fetcher.cc
- src/core/lib/address_utils/parse_address.cc
- src/core/lib/address_utils/sockaddr_utils.cc
@ -7685,10 +7691,8 @@ targets:
gtest: true
build: test
language: c++
headers:
- src/core/ext/filters/server_config_selector/server_config_selector.h
headers: []
src:
- src/core/ext/filters/server_config_selector/server_config_selector.cc
- test/core/server_config_selector/server_config_selector_test.cc
deps:
- grpc_test_util

4
config.m4 generated

@ -114,6 +114,8 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/server_config_selector/server_config_selector.cc \
src/core/ext/filters/server_config_selector/server_config_selector_filter.cc \
src/core/ext/service_config/service_config.cc \
src/core/ext/service_config/service_config_parser.cc \
src/core/ext/transport/chttp2/alpn/alpn.cc \
@ -362,6 +364,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/xds/xds_client_stats.cc \
src/core/ext/xds/xds_http_fault_filter.cc \
src/core/ext/xds/xds_http_filters.cc \
src/core/ext/xds/xds_routing.cc \
src/core/ext/xds/xds_server_config_fetcher.cc \
src/core/lib/address_utils/parse_address.cc \
src/core/lib/address_utils/sockaddr_utils.cc \
@ -1100,6 +1103,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/server)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/max_age)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/message_size)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/server_config_selector)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/service_config)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/transport/chttp2/alpn)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/transport/chttp2/client)

4
config.w32 generated

@ -80,6 +80,8 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\http\\server\\http_server_filter.cc " +
"src\\core\\ext\\filters\\max_age\\max_age_filter.cc " +
"src\\core\\ext\\filters\\message_size\\message_size_filter.cc " +
"src\\core\\ext\\filters\\server_config_selector\\server_config_selector.cc " +
"src\\core\\ext\\filters\\server_config_selector\\server_config_selector_filter.cc " +
"src\\core\\ext\\service_config\\service_config.cc " +
"src\\core\\ext\\service_config\\service_config_parser.cc " +
"src\\core\\ext\\transport\\chttp2\\alpn\\alpn.cc " +
@ -328,6 +330,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\xds\\xds_client_stats.cc " +
"src\\core\\ext\\xds\\xds_http_fault_filter.cc " +
"src\\core\\ext\\xds\\xds_http_filters.cc " +
"src\\core\\ext\\xds\\xds_routing.cc " +
"src\\core\\ext\\xds\\xds_server_config_fetcher.cc " +
"src\\core\\lib\\address_utils\\parse_address.cc " +
"src\\core\\lib\\address_utils\\sockaddr_utils.cc " +
@ -1099,6 +1102,7 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\server");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\max_age");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\message_size");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\server_config_selector");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\service_config");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\transport");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\transport\\chttp2");

6
gRPC-C++.podspec generated

@ -270,6 +270,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/max_age/max_age_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/server_config_selector/server_config_selector.h',
'src/core/ext/filters/server_config_selector/server_config_selector_filter.h',
'src/core/ext/service_config/service_config.h',
'src/core/ext/service_config/service_config_call_data.h',
'src/core/ext/service_config/service_config_parser.h',
@ -553,6 +555,7 @@ Pod::Spec.new do |s|
'src/core/ext/xds/xds_client_stats.h',
'src/core/ext/xds/xds_http_fault_filter.h',
'src/core/ext/xds/xds_http_filters.h',
'src/core/ext/xds/xds_routing.h',
'src/core/lib/address_utils/parse_address.h',
'src/core/lib/address_utils/sockaddr_utils.h',
'src/core/lib/avl/avl.h',
@ -998,6 +1001,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/max_age/max_age_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/server_config_selector/server_config_selector.h',
'src/core/ext/filters/server_config_selector/server_config_selector_filter.h',
'src/core/ext/service_config/service_config.h',
'src/core/ext/service_config/service_config_call_data.h',
'src/core/ext/service_config/service_config_parser.h',
@ -1263,6 +1268,7 @@ Pod::Spec.new do |s|
'src/core/ext/xds/xds_client_stats.h',
'src/core/ext/xds/xds_http_fault_filter.h',
'src/core/ext/xds/xds_http_filters.h',
'src/core/ext/xds/xds_routing.h',
'src/core/lib/address_utils/parse_address.h',
'src/core/lib/address_utils/sockaddr_utils.h',
'src/core/lib/avl/avl.h',

9
gRPC-Core.podspec generated

@ -321,6 +321,10 @@ Pod::Spec.new do |s|
'src/core/ext/filters/max_age/max_age_filter.h',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/server_config_selector/server_config_selector.cc',
'src/core/ext/filters/server_config_selector/server_config_selector.h',
'src/core/ext/filters/server_config_selector/server_config_selector_filter.cc',
'src/core/ext/filters/server_config_selector/server_config_selector_filter.h',
'src/core/ext/service_config/service_config.cc',
'src/core/ext/service_config/service_config.h',
'src/core/ext/service_config/service_config_call_data.h',
@ -814,6 +818,8 @@ Pod::Spec.new do |s|
'src/core/ext/xds/xds_http_fault_filter.h',
'src/core/ext/xds/xds_http_filters.cc',
'src/core/ext/xds/xds_http_filters.h',
'src/core/ext/xds/xds_routing.cc',
'src/core/ext/xds/xds_routing.h',
'src/core/ext/xds/xds_server_config_fetcher.cc',
'src/core/lib/address_utils/parse_address.cc',
'src/core/lib/address_utils/parse_address.h',
@ -1542,6 +1548,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/max_age/max_age_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/server_config_selector/server_config_selector.h',
'src/core/ext/filters/server_config_selector/server_config_selector_filter.h',
'src/core/ext/service_config/service_config.h',
'src/core/ext/service_config/service_config_call_data.h',
'src/core/ext/service_config/service_config_parser.h',
@ -1787,6 +1795,7 @@ Pod::Spec.new do |s|
'src/core/ext/xds/xds_client_stats.h',
'src/core/ext/xds/xds_http_fault_filter.h',
'src/core/ext/xds/xds_http_filters.h',
'src/core/ext/xds/xds_routing.h',
'src/core/lib/address_utils/parse_address.h',
'src/core/lib/address_utils/sockaddr_utils.h',
'src/core/lib/avl/avl.h',

6
grpc.gemspec generated

@ -241,6 +241,10 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/max_age/max_age_filter.h )
s.files += %w( src/core/ext/filters/message_size/message_size_filter.cc )
s.files += %w( src/core/ext/filters/message_size/message_size_filter.h )
s.files += %w( src/core/ext/filters/server_config_selector/server_config_selector.cc )
s.files += %w( src/core/ext/filters/server_config_selector/server_config_selector.h )
s.files += %w( src/core/ext/filters/server_config_selector/server_config_selector_filter.cc )
s.files += %w( src/core/ext/filters/server_config_selector/server_config_selector_filter.h )
s.files += %w( src/core/ext/service_config/service_config.cc )
s.files += %w( src/core/ext/service_config/service_config.h )
s.files += %w( src/core/ext/service_config/service_config_call_data.h )
@ -734,6 +738,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/xds/xds_http_fault_filter.h )
s.files += %w( src/core/ext/xds/xds_http_filters.cc )
s.files += %w( src/core/ext/xds/xds_http_filters.h )
s.files += %w( src/core/ext/xds/xds_routing.cc )
s.files += %w( src/core/ext/xds/xds_routing.h )
s.files += %w( src/core/ext/xds/xds_server_config_fetcher.cc )
s.files += %w( src/core/lib/address_utils/parse_address.cc )
s.files += %w( src/core/lib/address_utils/parse_address.h )

3
grpc.gyp generated

@ -560,6 +560,8 @@
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/server_config_selector/server_config_selector.cc',
'src/core/ext/filters/server_config_selector/server_config_selector_filter.cc',
'src/core/ext/service_config/service_config.cc',
'src/core/ext/service_config/service_config_parser.cc',
'src/core/ext/transport/chttp2/alpn/alpn.cc',
@ -795,6 +797,7 @@
'src/core/ext/xds/xds_client_stats.cc',
'src/core/ext/xds/xds_http_fault_filter.cc',
'src/core/ext/xds/xds_http_filters.cc',
'src/core/ext/xds/xds_routing.cc',
'src/core/ext/xds/xds_server_config_fetcher.cc',
'src/core/lib/address_utils/parse_address.cc',
'src/core/lib/address_utils/sockaddr_utils.cc',

6
package.xml generated

@ -221,6 +221,10 @@
<file baseinstalldir="/" name="src/core/ext/filters/max_age/max_age_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/message_size/message_size_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/message_size/message_size_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/server_config_selector/server_config_selector.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/server_config_selector/server_config_selector.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/server_config_selector/server_config_selector_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/server_config_selector/server_config_selector_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/service_config/service_config.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/service_config/service_config.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/service_config/service_config_call_data.h" role="src" />
@ -714,6 +718,8 @@
<file baseinstalldir="/" name="src/core/ext/xds/xds_http_fault_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_http_filters.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_http_filters.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_routing.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_routing.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_server_config_fetcher.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/address_utils/parse_address.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/address_utils/parse_address.h" role="src" />

@ -31,6 +31,7 @@
#include "src/core/ext/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/ext/xds/xds_routing.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -257,6 +258,8 @@ class XdsResolver : public Resolver {
};
using RouteTable = std::vector<Route>;
class RouteListIterator;
void MaybeAddCluster(const std::string& name);
grpc_error_handle CreateMethodConfig(
const XdsApi::Route& route,
@ -323,6 +326,25 @@ bool XdsResolver::XdsConfigSelector::Route::operator==(
MethodConfigsEqual(method_config.get(), other.method_config.get());
}
// Implementation of XdsRouting::RouteListIterator for getting the matching
// route for a request.
class XdsResolver::XdsConfigSelector::RouteListIterator
: public XdsRouting::RouteListIterator {
public:
explicit RouteListIterator(const RouteTable* route_table)
: route_table_(route_table) {}
size_t Size() const override { return route_table_->size(); }
const XdsApi::Route::Matchers& GetMatchersForRoute(
size_t index) const override {
return (*route_table_)[index].route.matchers;
}
private:
const RouteTable* route_table_;
};
//
// XdsResolver::XdsConfigSelector
//
@ -408,25 +430,6 @@ XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
resolver_->MaybeRemoveUnusedClusters();
}
const XdsHttpFilterImpl::FilterConfig* FindFilterConfigOverride(
const std::string& instance_name,
const XdsApi::RdsUpdate::VirtualHost& vhost, const XdsApi::Route& route,
const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight) {
// Check ClusterWeight, if any.
if (cluster_weight != nullptr) {
auto it = cluster_weight->typed_per_filter_config.find(instance_name);
if (it != cluster_weight->typed_per_filter_config.end()) return &it->second;
}
// Check Route.
auto it = route.typed_per_filter_config.find(instance_name);
if (it != route.typed_per_filter_config.end()) return &it->second;
// Check VirtualHost.
it = vhost.typed_per_filter_config.find(instance_name);
if (it != vhost.typed_per_filter_config.end()) return &it->second;
// Not found.
return nullptr;
}
grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
const XdsApi::Route& route,
const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight,
@ -483,39 +486,15 @@ grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
route_action.max_stream_duration->nanos));
}
// Handle xDS HTTP filters.
std::map<std::string, std::vector<std::string>> per_filter_configs;
grpc_channel_args* args = grpc_channel_args_copy(resolver_->args_);
for (const auto& http_filter :
resolver_->current_listener_.http_connection_manager.http_filters) {
// Find filter. This is guaranteed to succeed, because it's checked
// at config validation time in the XdsApi code.
const XdsHttpFilterImpl* filter_impl =
XdsHttpFilterRegistry::GetFilterForType(
http_filter.config.config_proto_type_name);
GPR_ASSERT(filter_impl != nullptr);
// If there is not actually any C-core filter associated with this
// xDS filter, then it won't need any config, so skip it.
if (filter_impl->channel_filter() == nullptr) continue;
// Allow filter to add channel args that may affect service config
// parsing.
args = filter_impl->ModifyChannelArgs(args);
// Find config override, if any.
const XdsHttpFilterImpl::FilterConfig* config_override =
FindFilterConfigOverride(http_filter.name,
resolver_->current_virtual_host_, route,
cluster_weight);
// Generate service config for filter.
auto method_config_field =
filter_impl->GenerateServiceConfig(http_filter.config, config_override);
if (!method_config_field.ok()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"failed to generate method config for HTTP filter ", http_filter.name,
": ", method_config_field.status().ToString()));
}
per_filter_configs[method_config_field->service_config_field_name]
.push_back(method_config_field->element);
}
for (const auto& p : per_filter_configs) {
XdsRouting::GeneratePerHttpFilterConfigsResult result =
XdsRouting::GeneratePerHTTPFilterConfigs(
resolver_->current_listener_.http_connection_manager.http_filters,
resolver_->current_virtual_host_, route, cluster_weight,
grpc_channel_args_copy(resolver_->args_));
if (result.error != GRPC_ERROR_NONE) {
return result.error;
}
for (const auto& p : result.per_filter_configs) {
fields.emplace_back(absl::StrCat(" \"", p.first, "\": [\n",
absl::StrJoin(p.second, ",\n"),
"\n ]"));
@ -533,9 +512,9 @@ grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
absl::StrJoin(fields, ",\n"),
"\n } ]\n"
"}");
*method_config = ServiceConfig::Create(args, json.c_str(), &error);
*method_config = ServiceConfig::Create(result.args, json.c_str(), &error);
}
grpc_channel_args_destroy(args);
grpc_channel_args_destroy(result.args);
return error;
}
@ -556,39 +535,13 @@ void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
}
}
absl::optional<absl::string_view> GetHeaderValue(
grpc_metadata_batch* initial_metadata, absl::string_view header_name,
std::string* concatenated_value) {
// Note: If we ever allow binary headers here, we still need to
// special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
// they are not visible to the LB policy in grpc-go.
if (absl::EndsWith(header_name, "-bin")) {
return absl::nullopt;
} else if (header_name == "content-type") {
return "application/grpc";
}
return initial_metadata->GetValue(header_name, concatenated_value);
}
bool HeadersMatch(const std::vector<HeaderMatcher>& header_matchers,
grpc_metadata_batch* initial_metadata) {
for (const auto& header_matcher : header_matchers) {
std::string concatenated_value;
if (!header_matcher.Match(GetHeaderValue(
initial_metadata, header_matcher.name(), &concatenated_value))) {
return false;
}
}
return true;
}
absl::optional<uint64_t> HeaderHashHelper(
const XdsApi::Route::RouteAction::HashPolicy& policy,
grpc_metadata_batch* initial_metadata) {
GPR_ASSERT(policy.type == XdsApi::Route::RouteAction::HashPolicy::HEADER);
std::string value_buffer;
absl::optional<absl::string_view> header_value =
GetHeaderValue(initial_metadata, policy.header_name, &value_buffer);
absl::optional<absl::string_view> header_value = XdsRouting::GetHeaderValue(
initial_metadata, policy.header_name, &value_buffer);
if (!header_value.has_value()) {
return absl::nullopt;
}
@ -604,129 +557,112 @@ absl::optional<uint64_t> HeaderHashHelper(
return XXH64(header_value->data(), header_value->size(), 0);
}
bool UnderFraction(const uint32_t fraction_per_million) {
// Generate a random number in [0, 1000000).
const uint32_t random_number = rand() % 1000000;
return random_number < fraction_per_million;
}
ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
GetCallConfigArgs args) {
for (const auto& entry : route_table_) {
// Path matching.
if (!entry.route.matchers.path_matcher.Match(
StringViewFromSlice(*args.path))) {
continue;
}
// Header Matching.
if (!HeadersMatch(entry.route.matchers.header_matchers,
args.initial_metadata)) {
continue;
}
// Match fraction check
if (entry.route.matchers.fraction_per_million.has_value() &&
!UnderFraction(entry.route.matchers.fraction_per_million.value())) {
continue;
}
// Found a route match
const auto* route_action =
absl::get_if<XdsApi::Route::RouteAction>(&entry.route.action);
if (route_action == nullptr) {
CallConfig call_config;
call_config.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Matching route has inappropriate action"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
return call_config;
}
absl::string_view cluster_name;
RefCountedPtr<ServiceConfig> method_config;
if (route_action->weighted_clusters.empty()) {
cluster_name = route_action->cluster_name;
method_config = entry.method_config;
} else {
const uint32_t key =
rand() %
entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
.range_end;
// Find the index in weighted clusters corresponding to key.
size_t mid = 0;
size_t start_index = 0;
size_t end_index = entry.weighted_cluster_state.size() - 1;
size_t index = 0;
while (end_index > start_index) {
mid = (start_index + end_index) / 2;
if (entry.weighted_cluster_state[mid].range_end > key) {
end_index = mid;
} else if (entry.weighted_cluster_state[mid].range_end < key) {
start_index = mid + 1;
} else {
index = mid + 1;
break;
}
}
if (index == 0) index = start_index;
GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key);
cluster_name = entry.weighted_cluster_state[index].cluster;
method_config = entry.weighted_cluster_state[index].method_config;
}
auto it = clusters_.find(cluster_name);
GPR_ASSERT(it != clusters_.end());
// Generate a hash.
absl::optional<uint64_t> hash;
for (const auto& hash_policy : route_action->hash_policies) {
absl::optional<uint64_t> new_hash;
switch (hash_policy.type) {
case XdsApi::Route::RouteAction::HashPolicy::HEADER:
new_hash = HeaderHashHelper(hash_policy, args.initial_metadata);
break;
case XdsApi::Route::RouteAction::HashPolicy::CHANNEL_ID:
new_hash = static_cast<uint64_t>(
reinterpret_cast<uintptr_t>(resolver_.get()));
break;
default:
GPR_ASSERT(0);
}
if (new_hash.has_value()) {
// Rotating the old value prevents duplicate hash rules from cancelling
// each other out and preserves all of the entropy
const uint64_t old_value =
hash.has_value() ? ((hash.value() << 1) | (hash.value() >> 63)) : 0;
hash = old_value ^ new_hash.value();
}
// If the policy is a terminal policy and a hash has been generated,
// ignore the rest of the hash policies.
if (hash_policy.terminal && hash.has_value()) {
break;
}
}
if (!hash.has_value()) {
// If there is no hash, we just choose a random value as a default.
// We cannot directly use the result of rand() as the hash value,
// since it is a 32-bit number and not a 64-bit number and will
// therefore not be evenly distributed.
uint32_t upper = rand();
uint32_t lower = rand();
hash = (static_cast<uint64_t>(upper) << 32) | lower;
}
auto route_index = XdsRouting::GetRouteForRequest(
RouteListIterator(&route_table_), StringViewFromSlice(*args.path),
args.initial_metadata);
if (!route_index.has_value()) {
return CallConfig();
}
auto& entry = route_table_[*route_index];
// Found a route match
const auto* route_action =
absl::get_if<XdsApi::Route::RouteAction>(&entry.route.action);
if (route_action == nullptr) {
CallConfig call_config;
if (method_config != nullptr) {
call_config.method_configs =
method_config->GetMethodParsedConfigVector(grpc_empty_slice());
call_config.service_config = std::move(method_config);
}
call_config.call_attributes[kXdsClusterAttribute] = it->first;
std::string hash_string = absl::StrCat(hash.value());
char* hash_value =
static_cast<char*>(args.arena->Alloc(hash_string.size() + 1));
memcpy(hash_value, hash_string.c_str(), hash_string.size());
hash_value[hash_string.size()] = '\0';
call_config.call_attributes[kRequestRingHashAttribute] = hash_value;
call_config.call_dispatch_controller =
args.arena->New<XdsCallDispatchController>(it->second->Ref());
call_config.error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Matching route has inappropriate action"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
return call_config;
}
return CallConfig();
absl::string_view cluster_name;
RefCountedPtr<ServiceConfig> method_config;
if (route_action->weighted_clusters.empty()) {
cluster_name = route_action->cluster_name;
method_config = entry.method_config;
} else {
const uint32_t key =
rand() %
entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
.range_end;
// Find the index in weighted clusters corresponding to key.
size_t mid = 0;
size_t start_index = 0;
size_t end_index = entry.weighted_cluster_state.size() - 1;
size_t index = 0;
while (end_index > start_index) {
mid = (start_index + end_index) / 2;
if (entry.weighted_cluster_state[mid].range_end > key) {
end_index = mid;
} else if (entry.weighted_cluster_state[mid].range_end < key) {
start_index = mid + 1;
} else {
index = mid + 1;
break;
}
}
if (index == 0) index = start_index;
GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key);
cluster_name = entry.weighted_cluster_state[index].cluster;
method_config = entry.weighted_cluster_state[index].method_config;
}
auto it = clusters_.find(cluster_name);
GPR_ASSERT(it != clusters_.end());
// Generate a hash.
absl::optional<uint64_t> hash;
for (const auto& hash_policy : route_action->hash_policies) {
absl::optional<uint64_t> new_hash;
switch (hash_policy.type) {
case XdsApi::Route::RouteAction::HashPolicy::HEADER:
new_hash = HeaderHashHelper(hash_policy, args.initial_metadata);
break;
case XdsApi::Route::RouteAction::HashPolicy::CHANNEL_ID:
new_hash =
static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver_.get()));
break;
default:
GPR_ASSERT(0);
}
if (new_hash.has_value()) {
// Rotating the old value prevents duplicate hash rules from cancelling
// each other out and preserves all of the entropy
const uint64_t old_value =
hash.has_value() ? ((hash.value() << 1) | (hash.value() >> 63)) : 0;
hash = old_value ^ new_hash.value();
}
// If the policy is a terminal policy and a hash has been generated,
// ignore the rest of the hash policies.
if (hash_policy.terminal && hash.has_value()) {
break;
}
}
if (!hash.has_value()) {
// If there is no hash, we just choose a random value as a default.
// We cannot directly use the result of rand() as the hash value,
// since it is a 32-bit number and not a 64-bit number and will
// therefore not be evenly distributed.
uint32_t upper = rand();
uint32_t lower = rand();
hash = (static_cast<uint64_t>(upper) << 32) | lower;
}
CallConfig call_config;
if (method_config != nullptr) {
call_config.method_configs =
method_config->GetMethodParsedConfigVector(grpc_empty_slice());
call_config.service_config = std::move(method_config);
}
call_config.call_attributes[kXdsClusterAttribute] = it->first;
std::string hash_string = absl::StrCat(hash.value());
char* hash_value =
static_cast<char*>(args.arena->Alloc(hash_string.size() + 1));
memcpy(hash_value, hash_string.c_str(), hash_string.size());
hash_value[hash_string.size()] = '\0';
call_config.call_attributes[kRequestRingHashAttribute] = hash_value;
call_config.call_dispatch_controller =
args.arena->New<XdsCallDispatchController>(it->second->Ref());
return call_config;
}
//
@ -808,6 +744,25 @@ void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
}
}
namespace {
class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator {
public:
explicit VirtualHostListIterator(
const std::vector<XdsApi::RdsUpdate::VirtualHost>* virtual_hosts)
: virtual_hosts_(virtual_hosts) {}
size_t Size() const override { return virtual_hosts_->size(); }
const std::vector<std::string>& GetDomainsForVirtualHost(
size_t index) const override {
return (*virtual_hosts_)[index].domains;
}
private:
const std::vector<XdsApi::RdsUpdate::VirtualHost>* virtual_hosts_;
};
} // namespace
void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
@ -816,16 +771,16 @@ void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
return;
}
// Find the relevant VirtualHost from the RouteConfiguration.
XdsApi::RdsUpdate::VirtualHost* vhost =
rds_update.FindVirtualHostForDomain(server_name_);
if (vhost == nullptr) {
auto vhost_index = XdsRouting::FindVirtualHostForDomain(
VirtualHostListIterator(&rds_update.virtual_hosts), server_name_);
if (!vhost_index.has_value()) {
OnError(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("could not find VirtualHost for ", server_name_,
" in RouteConfiguration")));
return;
}
// Save the virtual host in the resolver.
current_virtual_host_ = std::move(*vhost);
current_virtual_host_ = std::move(rds_update.virtual_hosts[*vhost_index]);
// Send a new result to the channel.
GenerateResult();
}

@ -89,6 +89,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/xds/xds_routing.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
@ -343,104 +344,6 @@ std::string XdsApi::RdsUpdate::ToString() const {
return absl::StrJoin(vhosts, "");
}
namespace {
// Better match type has smaller value.
enum MatchType {
EXACT_MATCH,
SUFFIX_MATCH,
PREFIX_MATCH,
UNIVERSE_MATCH,
INVALID_MATCH,
};
// Returns true if match succeeds.
bool DomainMatch(MatchType match_type, const std::string& domain_pattern_in,
const std::string& expected_host_name_in) {
// Normalize the args to lower-case. Domain matching is case-insensitive.
std::string domain_pattern = domain_pattern_in;
std::string expected_host_name = expected_host_name_in;
std::transform(domain_pattern.begin(), domain_pattern.end(),
domain_pattern.begin(),
[](unsigned char c) { return std::tolower(c); });
std::transform(expected_host_name.begin(), expected_host_name.end(),
expected_host_name.begin(),
[](unsigned char c) { return std::tolower(c); });
if (match_type == EXACT_MATCH) {
return domain_pattern == expected_host_name;
} else if (match_type == SUFFIX_MATCH) {
// Asterisk must match at least one char.
if (expected_host_name.size() < domain_pattern.size()) return false;
absl::string_view pattern_suffix(domain_pattern.c_str() + 1);
absl::string_view host_suffix(expected_host_name.c_str() +
expected_host_name.size() -
pattern_suffix.size());
return pattern_suffix == host_suffix;
} else if (match_type == PREFIX_MATCH) {
// Asterisk must match at least one char.
if (expected_host_name.size() < domain_pattern.size()) return false;
absl::string_view pattern_prefix(domain_pattern.c_str(),
domain_pattern.size() - 1);
absl::string_view host_prefix(expected_host_name.c_str(),
pattern_prefix.size());
return pattern_prefix == host_prefix;
} else {
return match_type == UNIVERSE_MATCH;
}
}
MatchType DomainPatternMatchType(const std::string& domain_pattern) {
if (domain_pattern.empty()) return INVALID_MATCH;
if (domain_pattern.find('*') == std::string::npos) return EXACT_MATCH;
if (domain_pattern == "*") return UNIVERSE_MATCH;
if (domain_pattern[0] == '*') return SUFFIX_MATCH;
if (domain_pattern[domain_pattern.size() - 1] == '*') return PREFIX_MATCH;
return INVALID_MATCH;
}
} // namespace
XdsApi::RdsUpdate::VirtualHost* XdsApi::RdsUpdate::FindVirtualHostForDomain(
const std::string& domain) {
// Find the best matched virtual host.
// The search order for 4 groups of domain patterns:
// 1. Exact match.
// 2. Suffix match (e.g., "*ABC").
// 3. Prefix match (e.g., "ABC*").
// 4. Universe match (i.e., "*").
// Within each group, longest match wins.
// If the same best matched domain pattern appears in multiple virtual hosts,
// the first matched virtual host wins.
VirtualHost* target_vhost = nullptr;
MatchType best_match_type = INVALID_MATCH;
size_t longest_match = 0;
// Check each domain pattern in each virtual host to determine the best
// matched virtual host.
for (VirtualHost& vhost : virtual_hosts) {
for (const std::string& domain_pattern : vhost.domains) {
// Check the match type first. Skip the pattern if it's not better than
// current match.
const MatchType match_type = DomainPatternMatchType(domain_pattern);
// This should be caught by RouteConfigParse().
GPR_ASSERT(match_type != INVALID_MATCH);
if (match_type > best_match_type) continue;
if (match_type == best_match_type &&
domain_pattern.size() <= longest_match) {
continue;
}
// Skip if match fails.
if (!DomainMatch(match_type, domain_pattern, domain)) continue;
// Choose this match.
target_vhost = &vhost;
best_match_type = match_type;
longest_match = domain_pattern.size();
if (best_match_type == EXACT_MATCH) break;
}
if (best_match_type == EXACT_MATCH) break;
}
return target_vhost;
}
//
// XdsApi::CommonTlsContext::CertificateValidationContext
//
@ -1917,8 +1820,7 @@ grpc_error_handle RouteConfigParse(
virtual_hosts[i], &domain_size);
for (size_t j = 0; j < domain_size; ++j) {
std::string domain_pattern = UpbStringToStdString(domains[j]);
const MatchType match_type = DomainPatternMatchType(domain_pattern);
if (match_type == INVALID_MATCH) {
if (!XdsRouting::IsValidDomainPattern(domain_pattern)) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("Invalid domain pattern \"", domain_pattern, "\"."));
}

@ -43,6 +43,8 @@
namespace grpc_core {
bool XdsRbacEnabled();
class XdsClient;
class XdsApi {
@ -206,7 +208,6 @@ class XdsApi {
return virtual_hosts == other.virtual_hosts;
}
std::string ToString() const;
VirtualHost* FindVirtualHostForDomain(const std::string& domain);
};
struct CommonTlsContext {

@ -0,0 +1,247 @@
//
//
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/xds/xds_routing.h"
#include <cctype>
namespace grpc_core {
namespace {
enum MatchType {
EXACT_MATCH,
SUFFIX_MATCH,
PREFIX_MATCH,
UNIVERSE_MATCH,
INVALID_MATCH,
};
// Returns true if match succeeds.
bool DomainMatch(MatchType match_type, absl::string_view domain_pattern_in,
absl::string_view expected_host_name_in) {
// Normalize the args to lower-case. Domain matching is case-insensitive.
std::string domain_pattern = std::string(domain_pattern_in);
std::string expected_host_name = std::string(expected_host_name_in);
std::transform(domain_pattern.begin(), domain_pattern.end(),
domain_pattern.begin(),
[](unsigned char c) { return std::tolower(c); });
std::transform(expected_host_name.begin(), expected_host_name.end(),
expected_host_name.begin(),
[](unsigned char c) { return std::tolower(c); });
if (match_type == EXACT_MATCH) {
return domain_pattern == expected_host_name;
} else if (match_type == SUFFIX_MATCH) {
// Asterisk must match at least one char.
if (expected_host_name.size() < domain_pattern.size()) return false;
absl::string_view pattern_suffix(domain_pattern.c_str() + 1);
absl::string_view host_suffix(expected_host_name.c_str() +
expected_host_name.size() -
pattern_suffix.size());
return pattern_suffix == host_suffix;
} else if (match_type == PREFIX_MATCH) {
// Asterisk must match at least one char.
if (expected_host_name.size() < domain_pattern.size()) return false;
absl::string_view pattern_prefix(domain_pattern.c_str(),
domain_pattern.size() - 1);
absl::string_view host_prefix(expected_host_name.c_str(),
pattern_prefix.size());
return pattern_prefix == host_prefix;
} else {
return match_type == UNIVERSE_MATCH;
}
}
MatchType DomainPatternMatchType(absl::string_view domain_pattern) {
if (domain_pattern.empty()) return INVALID_MATCH;
if (domain_pattern.find('*') == std::string::npos) return EXACT_MATCH;
if (domain_pattern == "*") return UNIVERSE_MATCH;
if (domain_pattern[0] == '*') return SUFFIX_MATCH;
if (domain_pattern[domain_pattern.size() - 1] == '*') return PREFIX_MATCH;
return INVALID_MATCH;
}
} // namespace
absl::optional<size_t> XdsRouting::FindVirtualHostForDomain(
const VirtualHostListIterator& vhost_iterator, absl::string_view domain) {
// Find the best matched virtual host.
// The search order for 4 groups of domain patterns:
// 1. Exact match.
// 2. Suffix match (e.g., "*ABC").
// 3. Prefix match (e.g., "ABC*").
// 4. Universe match (i.e., "*").
// Within each group, longest match wins.
// If the same best matched domain pattern appears in multiple virtual
// hosts, the first matched virtual host wins.
absl::optional<size_t> target_index;
MatchType best_match_type = INVALID_MATCH;
size_t longest_match = 0;
// Check each domain pattern in each virtual host to determine the best
// matched virtual host.
for (size_t i = 0; i < vhost_iterator.Size(); ++i) {
const auto& domains = vhost_iterator.GetDomainsForVirtualHost(i);
for (const std::string& domain_pattern : domains) {
// Check the match type first. Skip the pattern if it's not better
// than current match.
const MatchType match_type = DomainPatternMatchType(domain_pattern);
// This should be caught by RouteConfigParse().
GPR_ASSERT(match_type != INVALID_MATCH);
if (match_type > best_match_type) continue;
if (match_type == best_match_type &&
domain_pattern.size() <= longest_match) {
continue;
}
// Skip if match fails.
if (!DomainMatch(match_type, domain_pattern, domain)) continue;
// Choose this match.
target_index = i;
best_match_type = match_type;
longest_match = domain_pattern.size();
if (best_match_type == EXACT_MATCH) break;
}
if (best_match_type == EXACT_MATCH) break;
}
return target_index;
}
namespace {
bool HeadersMatch(const std::vector<HeaderMatcher>& header_matchers,
grpc_metadata_batch* initial_metadata) {
for (const auto& header_matcher : header_matchers) {
std::string concatenated_value;
if (!header_matcher.Match(XdsRouting::GetHeaderValue(
initial_metadata, header_matcher.name(), &concatenated_value))) {
return false;
}
}
return true;
}
bool UnderFraction(const uint32_t fraction_per_million) {
// Generate a random number in [0, 1000000).
const uint32_t random_number = rand() % 1000000;
return random_number < fraction_per_million;
}
} // namespace
absl::optional<size_t> XdsRouting::GetRouteForRequest(
const RouteListIterator& route_list_iterator, absl::string_view path,
grpc_metadata_batch* initial_metadata) {
for (size_t i = 0; i < route_list_iterator.Size(); ++i) {
const XdsApi::Route::Matchers& matchers =
route_list_iterator.GetMatchersForRoute(i);
if (matchers.path_matcher.Match(path) &&
HeadersMatch(matchers.header_matchers, initial_metadata) &&
(!matchers.fraction_per_million.has_value() ||
UnderFraction(*matchers.fraction_per_million))) {
return i;
}
}
return absl::nullopt;
}
bool XdsRouting::IsValidDomainPattern(absl::string_view domain_pattern) {
return DomainPatternMatchType(domain_pattern) != INVALID_MATCH;
}
absl::optional<absl::string_view> XdsRouting::GetHeaderValue(
grpc_metadata_batch* initial_metadata, absl::string_view header_name,
std::string* concatenated_value) {
// Note: If we ever allow binary headers here, we still need to
// special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
// they are not visible to the LB policy in grpc-go.
if (absl::EndsWith(header_name, "-bin")) {
return absl::nullopt;
} else if (header_name == "content-type") {
return "application/grpc";
}
return grpc_metadata_batch_get_value(initial_metadata, header_name,
concatenated_value);
}
namespace {
const XdsHttpFilterImpl::FilterConfig* FindFilterConfigOverride(
const std::string& instance_name,
const XdsApi::RdsUpdate::VirtualHost& vhost, const XdsApi::Route& route,
const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight) {
// Check ClusterWeight, if any.
if (cluster_weight != nullptr) {
auto it = cluster_weight->typed_per_filter_config.find(instance_name);
if (it != cluster_weight->typed_per_filter_config.end()) return &it->second;
}
// Check Route.
auto it = route.typed_per_filter_config.find(instance_name);
if (it != route.typed_per_filter_config.end()) return &it->second;
// Check VirtualHost.
it = vhost.typed_per_filter_config.find(instance_name);
if (it != vhost.typed_per_filter_config.end()) return &it->second;
// Not found.
return nullptr;
}
} // namespace
XdsRouting::GeneratePerHttpFilterConfigsResult
XdsRouting::GeneratePerHTTPFilterConfigs(
const std::vector<XdsApi::LdsUpdate::HttpConnectionManager::HttpFilter>&
http_filters,
const XdsApi::RdsUpdate::VirtualHost& vhost, const XdsApi::Route& route,
const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight,
grpc_channel_args* args) {
GeneratePerHttpFilterConfigsResult result;
result.args = args;
for (const auto& http_filter : http_filters) {
// Find filter. This is guaranteed to succeed, because it's checked
// at config validation time in the XdsApi code.
const XdsHttpFilterImpl* filter_impl =
XdsHttpFilterRegistry::GetFilterForType(
http_filter.config.config_proto_type_name);
GPR_ASSERT(filter_impl != nullptr);
// If there is not actually any C-core filter associated with this
// xDS filter, then it won't need any config, so skip it.
if (filter_impl->channel_filter() == nullptr) continue;
// Allow filter to add channel args that may affect service config
// parsing.
result.args = filter_impl->ModifyChannelArgs(result.args);
// Find config override, if any.
const XdsHttpFilterImpl::FilterConfig* config_override =
FindFilterConfigOverride(http_filter.name, vhost, route,
cluster_weight);
// Generate service config for filter.
auto method_config_field =
filter_impl->GenerateServiceConfig(http_filter.config, config_override);
if (!method_config_field.ok()) {
grpc_channel_args_destroy(result.args);
result.args = nullptr;
result.error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"failed to generate method config for HTTP filter ", http_filter.name,
": ", method_config_field.status().ToString()));
break;
}
result.per_filter_configs[method_config_field->service_config_field_name]
.push_back(method_config_field->element);
}
return result;
}
} // namespace grpc_core

@ -0,0 +1,98 @@
//
//
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_CORE_EXT_XDS_XDS_ROUTING_H
#define GRPC_CORE_EXT_XDS_XDS_ROUTING_H
#include <grpc/support/port_platform.h>
#include <vector>
#include "absl/strings/string_view.h"
#include <grpc/support/log.h>
#include "src/core/ext/xds/xds_api.h"
#include "src/core/lib/matchers/matchers.h"
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc_core {
class XdsRouting {
public:
class VirtualHostListIterator {
public:
virtual ~VirtualHostListIterator() = default;
// Returns the number of virtual hosts in the list.
virtual size_t Size() const = 0;
// Returns the domain list for the virtual host at the specified index.
virtual const std::vector<std::string>& GetDomainsForVirtualHost(
size_t index) const = 0;
};
class RouteListIterator {
public:
virtual ~RouteListIterator() = default;
// Number of routes.
virtual size_t Size() const = 0;
// Returns the matchers for the route at the specified index.
virtual const XdsApi::Route::Matchers& GetMatchersForRoute(
size_t index) const = 0;
};
// Returns the index of the selected virtual host in the list.
static absl::optional<size_t> FindVirtualHostForDomain(
const VirtualHostListIterator& vhost_iterator, absl::string_view domain);
// Returns the index in route_list_iterator to use for a request with
// the specified path and metadata, or nullopt if no route matches.
static absl::optional<size_t> GetRouteForRequest(
const RouteListIterator& route_list_iterator, absl::string_view path,
grpc_metadata_batch* initial_metadata);
// Returns true if \a domain_pattern is a valid domain pattern, false
// otherwise.
static bool IsValidDomainPattern(absl::string_view domain_pattern);
// Returns the metadata value(s) for the specified key.
// As special cases, binary headers return a value of absl::nullopt, and
// "content-type" header returns "application/grpc".
static absl::optional<absl::string_view> GetHeaderValue(
grpc_metadata_batch* initial_metadata, absl::string_view header_name,
std::string* concatenated_value);
struct GeneratePerHttpFilterConfigsResult {
// Map of field name to list of elements for that field
std::map<std::string, std::vector<std::string>> per_filter_configs;
grpc_error_handle error = GRPC_ERROR_NONE;
// Guaranteed to be nullptr if error is GRPC_ERROR_NONE
grpc_channel_args* args = nullptr;
};
// Generates a map of per_filter_configs. \a args is consumed.
static GeneratePerHttpFilterConfigsResult GeneratePerHTTPFilterConfigs(
const std::vector<XdsApi::LdsUpdate::HttpConnectionManager::HttpFilter>&
http_filters,
const XdsApi::RdsUpdate::VirtualHost& vhost, const XdsApi::Route& route,
const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight,
grpc_channel_args* args);
};
} // namespace grpc_core
#endif // GRPC_CORE_EXT_XDS_XDS_ROUTING_H

File diff suppressed because it is too large Load Diff

@ -33,6 +33,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/surface/completion_queue.h"
@ -465,7 +466,8 @@ struct grpc_server {
struct grpc_server_config_fetcher {
public:
class ConnectionManager : public grpc_core::RefCounted<ConnectionManager> {
class ConnectionManager
: public grpc_core::DualRefCounted<ConnectionManager> {
public:
// Ownership of \a args is transfered.
virtual absl::StatusOr<grpc_channel_args*> UpdateChannelArgsForConnection(

@ -89,6 +89,8 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/server_config_selector/server_config_selector.cc',
'src/core/ext/filters/server_config_selector/server_config_selector_filter.cc',
'src/core/ext/service_config/service_config.cc',
'src/core/ext/service_config/service_config_parser.cc',
'src/core/ext/transport/chttp2/alpn/alpn.cc',
@ -337,6 +339,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/xds/xds_client_stats.cc',
'src/core/ext/xds/xds_http_fault_filter.cc',
'src/core/ext/xds/xds_http_filters.cc',
'src/core/ext/xds/xds_routing.cc',
'src/core/ext/xds/xds_server_config_fetcher.cc',
'src/core/lib/address_utils/parse_address.cc',
'src/core/lib/address_utils/sockaddr_utils.cc',

@ -1335,9 +1335,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
expected_status);
}
bool WaitForRdsNack() {
bool WaitForRdsNack(StatusCode expected_status = StatusCode::UNAVAILABLE) {
return WaitForNack(
[&]() { return RouteConfigurationResponseState(0).state; });
[&]() { return RouteConfigurationResponseState(0).state; },
expected_status);
}
bool WaitForCdsNack() {
@ -1352,6 +1353,14 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
});
}
bool WaitForRouteConfigNack(
StatusCode expected_status = StatusCode::UNAVAILABLE) {
if (GetParam().enable_rds_testing()) {
return WaitForRdsNack(expected_status);
}
return WaitForLdsNack(expected_status);
}
AdsServiceImpl::ResponseState RouteConfigurationResponseState(int idx) const {
AdsServiceImpl* ads_service = balancers_[idx]->ads_service();
if (GetParam().enable_rds_testing()) {
@ -1421,7 +1430,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
hcm_accessor.Unpack(listener);
if (GetParam().enable_rds_testing()) {
auto* rds = http_connection_manager.mutable_rds();
rds->set_route_config_name(kDefaultRouteConfigurationName);
rds->set_route_config_name(route_config.name());
rds->mutable_config_source()->mutable_ads();
balancers_[idx]->ads_service()->SetRdsResource(route_config);
} else {
@ -9758,7 +9767,7 @@ TEST_P(XdsServerFilterChainMatchTest, DuplicateMatchOnSourcePortNacked) {
"filter chain: {source_ports={8080}}"));
}
class XdsServerRdsTest : public XdsServerSecurityTest {
class XdsServerRdsTest : public XdsEnabledServerStatusNotificationTest {
protected:
static void SetUpTestSuite() {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_RBAC", "true");
@ -9769,15 +9778,23 @@ class XdsServerRdsTest : public XdsServerSecurityTest {
}
};
TEST_P(XdsServerRdsTest, Basic) {
backends_[0]->Start();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
}
TEST_P(XdsServerRdsTest, NacksInvalidDomainPattern) {
RouteConfiguration route_config = default_server_route_config_;
route_config.mutable_virtual_hosts()->at(0).add_domains("");
SetServerListenerNameAndRouteConfiguration(
0, default_server_listener_, backends_[0]->port(), route_config);
backends_[0]->Start();
ASSERT_TRUE(WaitForLdsNack(StatusCode::DEADLINE_EXCEEDED))
ASSERT_TRUE(WaitForRouteConfigNack(StatusCode::DEADLINE_EXCEEDED))
<< "timed out waiting for NACK";
EXPECT_THAT(balancers_[0]->ads_service()->lds_response_state().error_message,
EXPECT_THAT(RouteConfigurationResponseState(0).error_message,
::testing::HasSubstr("Invalid domain pattern \"\""));
}
@ -9787,9 +9804,9 @@ TEST_P(XdsServerRdsTest, NacksEmptyDomainsList) {
SetServerListenerNameAndRouteConfiguration(
0, default_server_listener_, backends_[0]->port(), route_config);
backends_[0]->Start();
ASSERT_TRUE(WaitForLdsNack(StatusCode::DEADLINE_EXCEEDED))
ASSERT_TRUE(WaitForRouteConfigNack(StatusCode::DEADLINE_EXCEEDED))
<< "timed out waiting for NACK";
EXPECT_THAT(balancers_[0]->ads_service()->lds_response_state().error_message,
EXPECT_THAT(RouteConfigurationResponseState(0).error_message,
::testing::HasSubstr("VirtualHost has no domains"));
}
@ -9799,9 +9816,9 @@ TEST_P(XdsServerRdsTest, NacksEmptyRoutesList) {
SetServerListenerNameAndRouteConfiguration(
0, default_server_listener_, backends_[0]->port(), route_config);
backends_[0]->Start();
ASSERT_TRUE(WaitForLdsNack(StatusCode::DEADLINE_EXCEEDED))
ASSERT_TRUE(WaitForRouteConfigNack(StatusCode::DEADLINE_EXCEEDED))
<< "timed out waiting for NACK";
EXPECT_THAT(balancers_[0]->ads_service()->lds_response_state().error_message,
EXPECT_THAT(RouteConfigurationResponseState(0).error_message,
::testing::HasSubstr("No route found in the virtual host"));
}
@ -9815,12 +9832,123 @@ TEST_P(XdsServerRdsTest, NacksEmptyMatch) {
SetServerListenerNameAndRouteConfiguration(
0, default_server_listener_, backends_[0]->port(), route_config);
backends_[0]->Start();
ASSERT_TRUE(WaitForLdsNack(StatusCode::DEADLINE_EXCEEDED))
ASSERT_TRUE(WaitForRouteConfigNack(StatusCode::DEADLINE_EXCEEDED))
<< "timed out waiting for NACK";
EXPECT_THAT(balancers_[0]->ads_service()->lds_response_state().error_message,
EXPECT_THAT(RouteConfigurationResponseState(0).error_message,
::testing::HasSubstr("Match can't be null"));
}
TEST_P(XdsServerRdsTest, FailsRouteMatchesOtherThanNonForwardingAction) {
SetServerListenerNameAndRouteConfiguration(
0, default_server_listener_, backends_[0]->port(),
default_route_config_ /* inappropriate route config for servers */);
backends_[0]->Start();
// The server should be ready to serve but RPCs should fail.
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {},
true /* test_expects_failure */);
}
// Test that non-inline route configuration also works for non-default filter
// chains
TEST_P(XdsServerRdsTest, NonInlineRouteConfigurationNonDefaultFilterChain) {
if (!GetParam().enable_rds_testing()) {
return;
}
Listener listener = default_server_listener_;
auto* filter_chain = listener.add_filter_chains();
HttpConnectionManager http_connection_manager =
ServerHcmAccessor().Unpack(listener);
auto* rds = http_connection_manager.mutable_rds();
rds->set_route_config_name(kDefaultServerRouteConfigurationName);
rds->mutable_config_source()->mutable_ads();
filter_chain->add_filters()->mutable_typed_config()->PackFrom(
http_connection_manager);
SetServerListenerNameAndRouteConfiguration(0, listener, backends_[0]->port(),
default_server_route_config_);
backends_[0]->Start();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
}
TEST_P(XdsServerRdsTest, NonInlineRouteConfigurationNotAvailable) {
if (!GetParam().enable_rds_testing()) {
return;
}
Listener listener = default_server_listener_;
PopulateServerListenerNameAndPort(listener, backends_[0]->port());
HttpConnectionManager http_connection_manager =
ServerHcmAccessor().Unpack(listener);
auto* rds = http_connection_manager.mutable_rds();
rds->set_route_config_name("unknown_server_route_config");
rds->mutable_config_source()->mutable_ads();
listener.add_filter_chains()->add_filters()->mutable_typed_config()->PackFrom(
http_connection_manager);
SetServerListenerNameAndRouteConfiguration(0, listener, backends_[0]->port(),
default_server_route_config_);
backends_[0]->Start();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {},
true /* test_expects_failure */);
}
// TODO(yashykt): Once https://github.com/grpc/grpc/issues/24035 is fixed, we
// should add tests that make sure that different route configs are used for
// incoming connections with a different match.
TEST_P(XdsServerRdsTest, MultipleRouteConfigurations) {
Listener listener = default_server_listener_;
// Set a filter chain with a new route config name
auto new_route_config = default_server_route_config_;
new_route_config.set_name("new_server_route_config");
HttpConnectionManager http_connection_manager =
ServerHcmAccessor().Unpack(listener);
auto* rds = http_connection_manager.mutable_rds();
rds->set_route_config_name(new_route_config.name());
rds->mutable_config_source()->mutable_ads();
listener.add_filter_chains()->add_filters()->mutable_typed_config()->PackFrom(
http_connection_manager);
// Set another filter chain with another route config name
auto another_route_config = default_server_route_config_;
another_route_config.set_name("another_server_route_config");
http_connection_manager.mutable_rds()->set_route_config_name(
another_route_config.name());
auto* filter_chain = listener.add_filter_chains();
filter_chain->add_filters()->mutable_typed_config()->PackFrom(
http_connection_manager);
filter_chain->mutable_filter_chain_match()->set_source_type(
FilterChainMatch::SAME_IP_OR_LOOPBACK);
// Add another filter chain with the same route config name
filter_chain = listener.add_filter_chains();
filter_chain->add_filters()->mutable_typed_config()->PackFrom(
http_connection_manager);
filter_chain->mutable_filter_chain_match()->set_source_type(
FilterChainMatch::EXTERNAL);
// Add another filter chain with an inline route config
filter_chain = listener.add_filter_chains();
filter_chain->mutable_filter_chain_match()->add_source_ports(1234);
http_connection_manager = ServerHcmAccessor().Unpack(listener);
*http_connection_manager.mutable_route_config() =
default_server_route_config_;
filter_chain->add_filters()->mutable_typed_config()->PackFrom(
http_connection_manager);
// Set resources on the ADS service
balancers_[0]->ads_service()->SetRdsResource(new_route_config);
balancers_[0]->ads_service()->SetRdsResource(another_route_config);
SetServerListenerNameAndRouteConfiguration(0, listener, backends_[0]->port(),
default_server_route_config_);
backends_[0]->Start();
backends_[0]->notifier()->WaitOnServingStatusChange(
absl::StrCat(ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port()),
grpc::StatusCode::OK);
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
}
using EdsTest = BasicTest;
// Tests that EDS client should send a NACK if the EDS update contains
@ -12538,12 +12666,14 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, XdsServerFilterChainMatchTest,
&TestTypeName);
// We are only testing the server here.
// TODO(yashykt): Also add a test type with set_enable_rds_testing() once we
// start fetching non-inline resources.
INSTANTIATE_TEST_SUITE_P(XdsTest, XdsServerRdsTest,
::testing::Values(TestType()
.set_use_fake_resolver()
.set_use_xds_credentials()),
.set_use_xds_credentials(),
TestType()
.set_use_fake_resolver()
.set_use_xds_credentials()
.set_enable_rds_testing()),
&TestTypeName);
// EDS could be tested with or without XdsResolver, but the tests would

@ -1182,6 +1182,10 @@ src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/max_age/max_age_filter.h \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/message_size/message_size_filter.h \
src/core/ext/filters/server_config_selector/server_config_selector.cc \
src/core/ext/filters/server_config_selector/server_config_selector.h \
src/core/ext/filters/server_config_selector/server_config_selector_filter.cc \
src/core/ext/filters/server_config_selector/server_config_selector_filter.h \
src/core/ext/service_config/service_config.cc \
src/core/ext/service_config/service_config.h \
src/core/ext/service_config/service_config_call_data.h \
@ -1709,6 +1713,8 @@ src/core/ext/xds/xds_http_fault_filter.cc \
src/core/ext/xds/xds_http_fault_filter.h \
src/core/ext/xds/xds_http_filters.cc \
src/core/ext/xds/xds_http_filters.h \
src/core/ext/xds/xds_routing.cc \
src/core/ext/xds/xds_routing.h \
src/core/ext/xds/xds_server_config_fetcher.cc \
src/core/lib/address_utils/parse_address.cc \
src/core/lib/address_utils/parse_address.h \

@ -1006,6 +1006,10 @@ src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/max_age/max_age_filter.h \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/message_size/message_size_filter.h \
src/core/ext/filters/server_config_selector/server_config_selector.cc \
src/core/ext/filters/server_config_selector/server_config_selector.h \
src/core/ext/filters/server_config_selector/server_config_selector_filter.cc \
src/core/ext/filters/server_config_selector/server_config_selector_filter.h \
src/core/ext/service_config/service_config.cc \
src/core/ext/service_config/service_config.h \
src/core/ext/service_config/service_config_call_data.h \
@ -1503,6 +1507,8 @@ src/core/ext/xds/xds_http_fault_filter.cc \
src/core/ext/xds/xds_http_fault_filter.h \
src/core/ext/xds/xds_http_filters.cc \
src/core/ext/xds/xds_http_filters.h \
src/core/ext/xds/xds_routing.cc \
src/core/ext/xds/xds_routing.h \
src/core/ext/xds/xds_server_config_fetcher.cc \
src/core/lib/README.md \
src/core/lib/address_utils/parse_address.cc \

Loading…
Cancel
Save