[GCP auth filter] add "blackboard" mechanism for retaining global state, and use it for cache in GCP auth filter (#37646)

This is the last piece of gRFC A83 (https://github.com/grpc/proposal/pull/438).

Note that although this is the first use-case for this "blackboard" mechanism, we will also use it in the future for the xDS rate-limiting filter on the gRPC server side.

Closes #37646

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37646 from markdroth:gcp_auth_filter_state 72d0d96c79
PiperOrigin-RevId: 679707134
pull/37584/head
Mark D. Roth 2 months ago committed by Copybara-Service
parent 6dbc0bdb10
commit e56d766a45
  1. 2
      BUILD
  2. 62
      CMakeLists.txt
  3. 1
      Makefile
  4. 2
      Package.swift
  5. 57
      build_autogenerated.yaml
  6. 2
      config.m4
  7. 2
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      package.xml
  12. 24
      src/core/BUILD
  13. 7
      src/core/client_channel/client_channel.cc
  14. 3
      src/core/client_channel/client_channel.h
  15. 7
      src/core/client_channel/client_channel_filter.cc
  16. 3
      src/core/client_channel/client_channel_filter.h
  17. 12
      src/core/client_channel/dynamic_filters.cc
  18. 4
      src/core/client_channel/dynamic_filters.h
  19. 62
      src/core/ext/filters/gcp_authentication/gcp_authentication_filter.cc
  20. 32
      src/core/ext/filters/gcp_authentication/gcp_authentication_filter.h
  21. 33
      src/core/filter/blackboard.cc
  22. 71
      src/core/filter/blackboard.h
  23. 5
      src/core/lib/channel/channel_stack.cc
  24. 17
      src/core/lib/channel/channel_stack.h
  25. 2
      src/core/lib/channel/channel_stack_builder_impl.cc
  26. 13
      src/core/lib/channel/channel_stack_builder_impl.h
  27. 39
      src/core/lib/channel/promise_based_filter.h
  28. 17
      src/core/lib/transport/interception_chain.h
  29. 36
      src/core/util/lru_cache.h
  30. 1
      src/python/grpcio/grpc_core_dependencies.py
  31. 13
      test/core/filters/BUILD
  32. 66
      test/core/filters/blackboard_test.cc
  33. 33
      test/core/util/lru_cache_test.cc
  34. 69
      test/cpp/end2end/xds/xds_gcp_authn_end2end_test.cc
  35. 2
      tools/doxygen/Doxyfile.c++.internal
  36. 2
      tools/doxygen/Doxyfile.core.internal
  37. 24
      tools/run_tests/generated/tests.json

@ -2043,6 +2043,7 @@ grpc_cc_library(
"//src/core:arena_promise",
"//src/core:atomic_utils",
"//src/core:bitset",
"//src/core:blackboard",
"//src/core:call_destination",
"//src/core:call_filters",
"//src/core:call_final_info",
@ -3780,6 +3781,7 @@ grpc_cc_library(
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:backend_metric_parser",
"//src/core:blackboard",
"//src/core:call_destination",
"//src/core:call_filters",
"//src/core:call_spine",

62
CMakeLists.txt generated

@ -960,6 +960,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx binder_server_test)
add_dependencies(buildtests_cxx binder_transport_test)
add_dependencies(buildtests_cxx bitset_test)
add_dependencies(buildtests_cxx blackboard_test)
add_dependencies(buildtests_cxx buffer_list_test)
add_dependencies(buildtests_cxx byte_buffer_test)
add_dependencies(buildtests_cxx c_slice_buffer_test)
@ -2246,6 +2247,7 @@ add_library(grpc
src/core/ext/upbdefs-gen/xds/type/v3/cel.upbdefs.c
src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.c
src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c
src/core/filter/blackboard.cc
src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc
src/core/handshaker/handshaker.cc
src/core/handshaker/handshaker_registry.cc
@ -3053,6 +3055,7 @@ add_library(grpc_unsecure
src/core/ext/upb-gen/validate/validate.upb_minitable.c
src/core/ext/upb-gen/xds/data/orca/v3/orca_load_report.upb_minitable.c
src/core/ext/upb-gen/xds/service/orca/v3/orca.upb_minitable.c
src/core/filter/blackboard.cc
src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc
src/core/handshaker/handshaker.cc
src/core/handshaker/handshaker_registry.cc
@ -5280,6 +5283,7 @@ add_library(grpc_authorization_provider
src/core/ext/upb-gen/src/proto/grpc/gcp/altscontext.upb_minitable.c
src/core/ext/upb-gen/src/proto/grpc/gcp/handshaker.upb_minitable.c
src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb_minitable.c
src/core/filter/blackboard.cc
src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc
src/core/handshaker/handshaker.cc
src/core/handshaker/handshaker_registry.cc
@ -8406,6 +8410,63 @@ target_link_libraries(bitset_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(blackboard_test
src/core/filter/blackboard.cc
src/core/lib/address_utils/sockaddr_utils.cc
src/core/lib/channel/channel_args.cc
src/core/lib/iomgr/sockaddr_utils_posix.cc
src/core/lib/iomgr/socket_utils_windows.cc
src/core/lib/surface/channel_stack_type.cc
src/core/resolver/endpoint_addresses.cc
src/core/util/ref_counted_string.cc
src/core/util/time.cc
src/core/util/uri.cc
test/core/filters/blackboard_test.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(blackboard_test
PRIVATE
"GPR_DLL_IMPORTS"
)
endif()
endif()
target_compile_features(blackboard_test PUBLIC cxx_std_14)
target_include_directories(blackboard_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(blackboard_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::config
absl::flat_hash_map
absl::function_ref
absl::hash
absl::type_traits
absl::statusor
gpr
)
endif()
if(gRPC_BUILD_TESTS)
@ -9012,6 +9073,7 @@ add_executable(call_utils_test
src/core/ext/upb-gen/src/proto/grpc/gcp/altscontext.upb_minitable.c
src/core/ext/upb-gen/src/proto/grpc/gcp/handshaker.upb_minitable.c
src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb_minitable.c
src/core/filter/blackboard.cc
src/core/handshaker/handshaker_registry.cc
src/core/handshaker/proxy_mapper_registry.cc
src/core/lib/address_utils/parse_address.cc

1
Makefile generated

@ -1057,6 +1057,7 @@ LIBGRPC_SRC = \
src/core/ext/upbdefs-gen/xds/type/v3/cel.upbdefs.c \
src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.c \
src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c \
src/core/filter/blackboard.cc \
src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc \
src/core/handshaker/handshaker.cc \
src/core/handshaker/handshaker_registry.cc \

2
Package.swift generated

@ -1070,6 +1070,8 @@ let package = Package(
"src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.h",
"src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c",
"src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.h",
"src/core/filter/blackboard.cc",
"src/core/filter/blackboard.h",
"src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc",
"src/core/handshaker/endpoint_info/endpoint_info_handshaker.h",
"src/core/handshaker/handshaker.cc",

@ -781,6 +781,7 @@ libs:
- src/core/ext/upbdefs-gen/xds/type/v3/cel.upbdefs.h
- src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.h
- src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.h
- src/core/filter/blackboard.h
- src/core/handshaker/endpoint_info/endpoint_info_handshaker.h
- src/core/handshaker/handshaker.h
- src/core/handshaker/handshaker_factory.h
@ -1662,6 +1663,7 @@ libs:
- src/core/ext/upbdefs-gen/xds/type/v3/cel.upbdefs.c
- src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.c
- src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c
- src/core/filter/blackboard.cc
- src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc
- src/core/handshaker/handshaker.cc
- src/core/handshaker/handshaker_registry.cc
@ -2359,6 +2361,7 @@ libs:
- src/core/ext/upb-gen/xds/data/orca/v3/orca_load_report.upb_minitable.h
- src/core/ext/upb-gen/xds/service/orca/v3/orca.upb.h
- src/core/ext/upb-gen/xds/service/orca/v3/orca.upb_minitable.h
- src/core/filter/blackboard.h
- src/core/handshaker/endpoint_info/endpoint_info_handshaker.h
- src/core/handshaker/handshaker.h
- src/core/handshaker/handshaker_factory.h
@ -2819,6 +2822,7 @@ libs:
- src/core/ext/upb-gen/validate/validate.upb_minitable.c
- src/core/ext/upb-gen/xds/data/orca/v3/orca_load_report.upb_minitable.c
- src/core/ext/upb-gen/xds/service/orca/v3/orca.upb_minitable.c
- src/core/filter/blackboard.cc
- src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc
- src/core/handshaker/handshaker.cc
- src/core/handshaker/handshaker_registry.cc
@ -4467,6 +4471,7 @@ libs:
- src/core/ext/upb-gen/src/proto/grpc/gcp/handshaker.upb_minitable.h
- src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb.h
- src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb_minitable.h
- src/core/filter/blackboard.h
- src/core/handshaker/endpoint_info/endpoint_info_handshaker.h
- src/core/handshaker/handshaker.h
- src/core/handshaker/handshaker_factory.h
@ -4803,6 +4808,7 @@ libs:
- src/core/ext/upb-gen/src/proto/grpc/gcp/altscontext.upb_minitable.c
- src/core/ext/upb-gen/src/proto/grpc/gcp/handshaker.upb_minitable.c
- src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb_minitable.c
- src/core/filter/blackboard.cc
- src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc
- src/core/handshaker/handshaker.cc
- src/core/handshaker/handshaker_registry.cc
@ -6278,6 +6284,55 @@ targets:
- absl/log:check
- absl/numeric:bits
uses_polling: false
- name: blackboard_test
gtest: true
build: test
language: c++
headers:
- src/core/filter/blackboard.h
- src/core/lib/address_utils/sockaddr_utils.h
- src/core/lib/channel/channel_args.h
- src/core/lib/iomgr/port.h
- src/core/lib/iomgr/resolved_address.h
- src/core/lib/iomgr/sockaddr.h
- src/core/lib/iomgr/sockaddr_posix.h
- src/core/lib/iomgr/sockaddr_windows.h
- src/core/lib/iomgr/socket_utils.h
- src/core/lib/surface/channel_stack_type.h
- src/core/resolver/endpoint_addresses.h
- src/core/util/atomic_utils.h
- src/core/util/avl.h
- src/core/util/down_cast.h
- src/core/util/dual_ref_counted.h
- src/core/util/orphanable.h
- src/core/util/ref_counted.h
- src/core/util/ref_counted_ptr.h
- src/core/util/ref_counted_string.h
- src/core/util/time.h
- src/core/util/unique_type_name.h
- src/core/util/uri.h
src:
- src/core/filter/blackboard.cc
- src/core/lib/address_utils/sockaddr_utils.cc
- src/core/lib/channel/channel_args.cc
- src/core/lib/iomgr/sockaddr_utils_posix.cc
- src/core/lib/iomgr/socket_utils_windows.cc
- src/core/lib/surface/channel_stack_type.cc
- src/core/resolver/endpoint_addresses.cc
- src/core/util/ref_counted_string.cc
- src/core/util/time.cc
- src/core/util/uri.cc
- test/core/filters/blackboard_test.cc
deps:
- gtest
- absl/base:config
- absl/container:flat_hash_map
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
- gpr
uses_polling: false
- name: buffer_list_test
gtest: true
build: test
@ -6738,6 +6793,7 @@ targets:
- src/core/ext/upb-gen/src/proto/grpc/gcp/handshaker.upb_minitable.h
- src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb.h
- src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb_minitable.h
- src/core/filter/blackboard.h
- src/core/handshaker/handshaker_factory.h
- src/core/handshaker/handshaker_registry.h
- src/core/handshaker/proxy_mapper.h
@ -7043,6 +7099,7 @@ targets:
- src/core/ext/upb-gen/src/proto/grpc/gcp/altscontext.upb_minitable.c
- src/core/ext/upb-gen/src/proto/grpc/gcp/handshaker.upb_minitable.c
- src/core/ext/upb-gen/src/proto/grpc/gcp/transport_security_common.upb_minitable.c
- src/core/filter/blackboard.cc
- src/core/handshaker/handshaker_registry.cc
- src/core/handshaker/proxy_mapper_registry.cc
- src/core/lib/address_utils/parse_address.cc

2
config.m4 generated

@ -432,6 +432,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/upbdefs-gen/xds/type/v3/cel.upbdefs.c \
src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.c \
src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c \
src/core/filter/blackboard.cc \
src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc \
src/core/handshaker/handshaker.cc \
src/core/handshaker/handshaker_registry.cc \
@ -1528,6 +1529,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-gen/xds/core/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-gen/xds/type/matcher/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/upbdefs-gen/xds/type/v3)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/filter)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/handshaker)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/handshaker/endpoint_info)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/handshaker/http_connect)

2
config.w32 generated

@ -397,6 +397,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\upbdefs-gen\\xds\\type\\v3\\cel.upbdefs.c " +
"src\\core\\ext\\upbdefs-gen\\xds\\type\\v3\\range.upbdefs.c " +
"src\\core\\ext\\upbdefs-gen\\xds\\type\\v3\\typed_struct.upbdefs.c " +
"src\\core\\filter\\blackboard.cc " +
"src\\core\\handshaker\\endpoint_info\\endpoint_info_handshaker.cc " +
"src\\core\\handshaker\\handshaker.cc " +
"src\\core\\handshaker\\handshaker_registry.cc " +
@ -1666,6 +1667,7 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-gen\\xds\\type\\matcher");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-gen\\xds\\type\\matcher\\v3");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\upbdefs-gen\\xds\\type\\v3");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\filter");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\handshaker");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\handshaker\\endpoint_info");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\handshaker\\http_connect");

2
gRPC-C++.podspec generated

@ -870,6 +870,7 @@ Pod::Spec.new do |s|
'src/core/ext/upbdefs-gen/xds/type/v3/cel.upbdefs.h',
'src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.h',
'src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.h',
'src/core/filter/blackboard.h',
'src/core/handshaker/endpoint_info/endpoint_info_handshaker.h',
'src/core/handshaker/handshaker.h',
'src/core/handshaker/handshaker_factory.h',
@ -2173,6 +2174,7 @@ Pod::Spec.new do |s|
'src/core/ext/upbdefs-gen/xds/type/v3/cel.upbdefs.h',
'src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.h',
'src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.h',
'src/core/filter/blackboard.h',
'src/core/handshaker/endpoint_info/endpoint_info_handshaker.h',
'src/core/handshaker/handshaker.h',
'src/core/handshaker/handshaker_factory.h',

3
gRPC-Core.podspec generated

@ -1190,6 +1190,8 @@ Pod::Spec.new do |s|
'src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.h',
'src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c',
'src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.h',
'src/core/filter/blackboard.cc',
'src/core/filter/blackboard.h',
'src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc',
'src/core/handshaker/endpoint_info/endpoint_info_handshaker.h',
'src/core/handshaker/handshaker.cc',
@ -2960,6 +2962,7 @@ Pod::Spec.new do |s|
'src/core/ext/upbdefs-gen/xds/type/v3/cel.upbdefs.h',
'src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.h',
'src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.h',
'src/core/filter/blackboard.h',
'src/core/handshaker/endpoint_info/endpoint_info_handshaker.h',
'src/core/handshaker/handshaker.h',
'src/core/handshaker/handshaker_factory.h',

2
grpc.gemspec generated

@ -1076,6 +1076,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.h )
s.files += %w( src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c )
s.files += %w( src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.h )
s.files += %w( src/core/filter/blackboard.cc )
s.files += %w( src/core/filter/blackboard.h )
s.files += %w( src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc )
s.files += %w( src/core/handshaker/endpoint_info/endpoint_info_handshaker.h )
s.files += %w( src/core/handshaker/handshaker.cc )

2
package.xml generated

@ -1058,6 +1058,8 @@
<file baseinstalldir="/" name="src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.h" role="src" />
<file baseinstalldir="/" name="src/core/filter/blackboard.cc" role="src" />
<file baseinstalldir="/" name="src/core/filter/blackboard.h" role="src" />
<file baseinstalldir="/" name="src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc" role="src" />
<file baseinstalldir="/" name="src/core/handshaker/endpoint_info/endpoint_info_handshaker.h" role="src" />
<file baseinstalldir="/" name="src/core/handshaker/handshaker.cc" role="src" />

@ -3309,6 +3309,29 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "blackboard",
srcs = [
"filter/blackboard.cc",
],
hdrs = [
"filter/blackboard.h",
],
external_deps = [
"absl/container:flat_hash_map",
"absl/strings",
],
language = "c++",
deps = [
"ref_counted",
"unique_type_name",
"useful",
"//:debug_location",
"//:endpoint_addresses",
"//:ref_counted_ptr",
],
)
grpc_cc_library(
name = "subchannel_connector",
hdrs = [
@ -5034,6 +5057,7 @@ grpc_cc_library(
language = "c++",
deps = [
"arena",
"blackboard",
"channel_args",
"channel_fwd",
"context",

@ -1279,8 +1279,12 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked(
config_selector =
MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
}
// Modify channel args.
ChannelArgs new_args = args.SetObject(this).SetObject(saved_service_config_);
// Construct filter stack.
InterceptionChainBuilder builder(args.SetObject(this));
auto new_blackboard = MakeRefCounted<Blackboard>();
InterceptionChainBuilder builder(new_args, blackboard_.get(),
new_blackboard.get());
if (idle_timeout_ != Duration::Zero()) {
builder.AddOnServerTrailingMetadata([this](ServerMetadata&) {
if (idle_state_.DecreaseCallCount()) StartIdleTimer();
@ -1300,6 +1304,7 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked(
}
// Create call destination.
auto top_of_stack_call_destination = builder.Build(call_destination_);
blackboard_ = std::move(new_blackboard);
// Send result to data plane.
if (!top_of_stack_call_destination.ok()) {
resolver_data_for_calls_.Set(MaybeRewriteIllegalStatusCode(

@ -27,6 +27,7 @@
#include "src/core/client_channel/config_selector.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/ext/filters/channel_idle/idle_filter_state.h"
#include "src/core/filter/blackboard.h"
#include "src/core/lib/promise/observable.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/metadata.h"
@ -215,6 +216,8 @@ class ClientChannel : public Channel {
ABSL_GUARDED_BY(*work_serializer_);
RefCountedPtr<ConfigSelector> saved_config_selector_
ABSL_GUARDED_BY(*work_serializer_);
RefCountedPtr<const Blackboard> blackboard_
ABSL_GUARDED_BY(*work_serializer_);
OrphanablePtr<LoadBalancingPolicy> lb_policy_
ABSL_GUARDED_BY(*work_serializer_);
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_

@ -1498,6 +1498,7 @@ void ClientChannelFilter::UpdateServiceConfigInDataPlaneLocked(
config_selector =
MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
}
// Modify channel args.
ChannelArgs new_args = args.SetObject(this).SetObject(service_config);
bool enable_retries =
!new_args.WantMinimalStack() &&
@ -1510,9 +1511,11 @@ void ClientChannelFilter::UpdateServiceConfigInDataPlaneLocked(
} else {
filters.push_back(&DynamicTerminationFilter::kFilterVtable);
}
RefCountedPtr<DynamicFilters> dynamic_filters =
DynamicFilters::Create(new_args, std::move(filters));
auto new_blackboard = MakeRefCounted<Blackboard>();
RefCountedPtr<DynamicFilters> dynamic_filters = DynamicFilters::Create(
new_args, std::move(filters), blackboard_.get(), new_blackboard.get());
CHECK(dynamic_filters != nullptr);
blackboard_ = std::move(new_blackboard);
// Grab data plane lock to update service config.
//
// We defer unreffing the old values (and deallocating memory) until

@ -43,6 +43,7 @@
#include "src/core/client_channel/dynamic_filters.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_pool_interface.h"
#include "src/core/filter/blackboard.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
@ -316,6 +317,8 @@ class ClientChannelFilter final {
ABSL_GUARDED_BY(*work_serializer_);
RefCountedPtr<ConfigSelector> saved_config_selector_
ABSL_GUARDED_BY(*work_serializer_);
RefCountedPtr<const Blackboard> blackboard_
ABSL_GUARDED_BY(*work_serializer_);
OrphanablePtr<LoadBalancingPolicy> lb_policy_
ABSL_GUARDED_BY(*work_serializer_);
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_

@ -139,8 +139,10 @@ void DynamicFilters::Call::IncrementRefCount(const DebugLocation& /*location*/,
namespace {
absl::StatusOr<RefCountedPtr<grpc_channel_stack>> CreateChannelStack(
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters) {
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters,
const Blackboard* old_blackboard, Blackboard* new_blackboard) {
ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC, args);
builder.SetBlackboards(old_blackboard, new_blackboard);
for (auto filter : filters) {
builder.AppendFilter(filter);
}
@ -150,15 +152,17 @@ absl::StatusOr<RefCountedPtr<grpc_channel_stack>> CreateChannelStack(
} // namespace
RefCountedPtr<DynamicFilters> DynamicFilters::Create(
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters) {
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters,
const Blackboard* old_blackboard, Blackboard* new_blackboard) {
// Attempt to create channel stack from requested filters.
auto p = CreateChannelStack(args, std::move(filters));
auto p = CreateChannelStack(args, std::move(filters), old_blackboard,
new_blackboard);
if (!p.ok()) {
// Channel stack creation failed with requested filters.
// Create with lame filter instead.
auto error = p.status();
p = CreateChannelStack(args.Set(MakeLameClientErrorArg(&error)),
{&LameClientFilter::kFilter});
{&LameClientFilter::kFilter}, nullptr, nullptr);
}
return MakeRefCounted<DynamicFilters>(std::move(p.value()));
}

@ -24,6 +24,7 @@
#include <grpc/slice.h>
#include "src/core/filter/blackboard.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
@ -90,7 +91,8 @@ class DynamicFilters final : public RefCounted<DynamicFilters> {
};
static RefCountedPtr<DynamicFilters> Create(
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters);
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters,
const Blackboard* old_blackboard, Blackboard* new_blackboard);
explicit DynamicFilters(RefCountedPtr<grpc_channel_stack> channel_stack)
: channel_stack_(std::move(channel_stack)) {}

@ -37,6 +37,10 @@
namespace grpc_core {
//
// GcpAuthenticationFilter::Call
//
const NoInterceptor GcpAuthenticationFilter::Call::OnClientToServerMessage;
const NoInterceptor GcpAuthenticationFilter::Call::OnClientToServerHalfClose;
const NoInterceptor GcpAuthenticationFilter::Call::OnServerInitialMetadata;
@ -98,7 +102,7 @@ absl::Status GcpAuthenticationFilter::Call::OnClientInitialMetadata(
cluster_name));
}
// Get the call creds instance.
auto creds = filter->GetCallCredentials(
auto creds = filter->cache_->Get(
DownCast<const XdsGcpAuthnAudienceMetadataValue*>(metadata_value)->url());
// Add the call creds instance to the call.
auto* arena = GetContext<Arena>();
@ -113,6 +117,34 @@ absl::Status GcpAuthenticationFilter::Call::OnClientInitialMetadata(
return absl::OkStatus();
}
//
// GcpAuthenticationFilter::CallCredentialsCache
//
UniqueTypeName GcpAuthenticationFilter::CallCredentialsCache::Type() {
static UniqueTypeName::Factory factory("gcp_auth_call_creds_cache");
return factory.Create();
}
void GcpAuthenticationFilter::CallCredentialsCache::SetMaxSize(
size_t max_size) {
MutexLock lock(&mu_);
cache_.SetMaxSize(max_size);
}
RefCountedPtr<grpc_call_credentials>
GcpAuthenticationFilter::CallCredentialsCache::Get(
const std::string& audience) {
MutexLock lock(&mu_);
return cache_.GetOrInsert(audience, [](const std::string& audience) {
return MakeRefCounted<GcpServiceAccountIdentityCallCredentials>(audience);
});
}
//
// GcpAuthenticationFilter
//
const grpc_channel_filter GcpAuthenticationFilter::kFilter =
MakePromiseBasedFilter<GcpAuthenticationFilter, FilterEndpoint::kClient,
0>();
@ -120,6 +152,7 @@ const grpc_channel_filter GcpAuthenticationFilter::kFilter =
absl::StatusOr<std::unique_ptr<GcpAuthenticationFilter>>
GcpAuthenticationFilter::Create(const ChannelArgs& args,
ChannelFilter::Args filter_args) {
// Get filter config.
auto* service_config = args.GetObject<ServiceConfig>();
if (service_config == nullptr) {
return absl::InvalidArgumentError(
@ -136,29 +169,32 @@ GcpAuthenticationFilter::Create(const ChannelArgs& args,
return absl::InvalidArgumentError(
"gcp_auth: filter instance ID not found in filter config");
}
// Get XdsConfig so that we can look up CDS resources.
auto xds_config = args.GetObjectRef<XdsConfig>();
if (xds_config == nullptr) {
return absl::InvalidArgumentError(
"gcp_auth: xds config not found in channel args");
}
return std::make_unique<GcpAuthenticationFilter>(filter_config,
std::move(xds_config));
// Get existing cache or create new one.
auto cache = filter_args.GetOrCreateState<CallCredentialsCache>(
filter_config->filter_instance_name, [&]() {
return MakeRefCounted<CallCredentialsCache>(filter_config->cache_size);
});
// Make sure size is updated, in case we're reusing a pre-existing
// cache but it has the wrong size.
cache->SetMaxSize(filter_config->cache_size);
// Instantiate filter.
return std::unique_ptr<GcpAuthenticationFilter>(new GcpAuthenticationFilter(
filter_config, std::move(xds_config), std::move(cache)));
}
GcpAuthenticationFilter::GcpAuthenticationFilter(
const GcpAuthenticationParsedConfig::Config* filter_config,
RefCountedPtr<const XdsConfig> xds_config)
RefCountedPtr<const XdsConfig> xds_config,
RefCountedPtr<CallCredentialsCache> cache)
: filter_config_(filter_config),
xds_config_(std::move(xds_config)),
cache_(filter_config->cache_size) {}
RefCountedPtr<grpc_call_credentials>
GcpAuthenticationFilter::GetCallCredentials(const std::string& audience) {
MutexLock lock(&mu_);
return cache_.GetOrInsert(audience, [](const std::string& audience) {
return MakeRefCounted<GcpServiceAccountIdentityCallCredentials>(audience);
});
}
cache_(std::move(cache)) {}
void GcpAuthenticationFilterRegister(CoreConfiguration::Builder* builder) {
GcpAuthenticationServiceConfigParser::Register(builder);

@ -25,6 +25,7 @@
#include "absl/strings/string_view.h"
#include "src/core/ext/filters/gcp_authentication/gcp_authentication_service_config_parser.h"
#include "src/core/filter/blackboard.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/promise_based_filter.h"
@ -49,10 +50,6 @@ class GcpAuthenticationFilter
static absl::StatusOr<std::unique_ptr<GcpAuthenticationFilter>> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
GcpAuthenticationFilter(
const GcpAuthenticationParsedConfig::Config* filter_config,
RefCountedPtr<const XdsConfig> xds_config);
class Call {
public:
absl::Status OnClientInitialMetadata(ClientMetadata& /*md*/,
@ -66,15 +63,30 @@ class GcpAuthenticationFilter
};
private:
RefCountedPtr<grpc_call_credentials> GetCallCredentials(
const std::string& audience);
class CallCredentialsCache : public Blackboard::Entry {
public:
explicit CallCredentialsCache(size_t max_size) : cache_(max_size) {}
static UniqueTypeName Type();
void SetMaxSize(size_t max_size);
RefCountedPtr<grpc_call_credentials> Get(const std::string& audience);
private:
Mutex mu_;
LruCache<std::string /*audience*/, RefCountedPtr<grpc_call_credentials>>
cache_ ABSL_GUARDED_BY(&mu_);
};
GcpAuthenticationFilter(
const GcpAuthenticationParsedConfig::Config* filter_config,
RefCountedPtr<const XdsConfig> xds_config,
RefCountedPtr<CallCredentialsCache> cache);
const GcpAuthenticationParsedConfig::Config* filter_config_;
const RefCountedPtr<const XdsConfig> xds_config_;
Mutex mu_;
LruCache<std::string /*audience*/, RefCountedPtr<grpc_call_credentials>>
cache_ ABSL_GUARDED_BY(&mu_);
const RefCountedPtr<CallCredentialsCache> cache_;
};
} // namespace grpc_core

@ -0,0 +1,33 @@
//
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "src/core/filter/blackboard.h"
namespace grpc_core {
RefCountedPtr<Blackboard::Entry> Blackboard::Get(UniqueTypeName type,
const std::string& key) const {
auto it = map_.find(std::make_pair(type, key));
if (it == map_.end()) return nullptr;
return it->second;
}
void Blackboard::Set(UniqueTypeName type, const std::string& key,
RefCountedPtr<Entry> entry) {
map_[std::make_pair(type, key)] = std::move(entry);
}
} // namespace grpc_core

@ -0,0 +1,71 @@
//
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_FILTER_BLACKBOARD_H
#define GRPC_SRC_CORE_FILTER_BLACKBOARD_H
#include <string>
#include <utility>
#include "absl/container/flat_hash_map.h"
#include "absl/strings/string_view.h"
#include "src/core/resolver/endpoint_addresses.h"
#include "src/core/util/debug_location.h"
#include "src/core/util/ref_counted.h"
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/util/unique_type_name.h"
#include "src/core/util/useful.h"
namespace grpc_core {
// A blackboard is a place where dynamic filters can stash global state
// that they may want to retain across resolver updates. Entries are
// identified by by the unique type and a name that identifies the instance,
// which means that it's possible for two filter instances to use the same
// type (e.g., if there are two instantiations of the same filter).
class Blackboard : public RefCounted<Blackboard> {
public:
// All entries must derive from this type.
// They must also have a static method with the following signature:
// static UniqueTypeName Type();
class Entry : public RefCounted<Entry> {};
// Returns an entry for a particular type and name, or null if not present.
template <typename T>
RefCountedPtr<T> Get(const std::string& key) const {
return Get(T::Type(), key).template TakeAsSubclass<T>();
}
// Sets an entry for a particular type and name.
template <typename T>
void Set(const std::string& key, RefCountedPtr<T> entry) {
Set(T::Type(), key, std::move(entry));
}
private:
RefCountedPtr<Entry> Get(UniqueTypeName type, const std::string& key) const;
void Set(UniqueTypeName type, const std::string& key,
RefCountedPtr<Entry> entry);
absl::flat_hash_map<std::pair<UniqueTypeName, std::string>,
RefCountedPtr<Entry>>
map_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_FILTER_BLACKBOARD_H

@ -116,7 +116,8 @@ grpc_error_handle grpc_channel_stack_init(
int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
const grpc_channel_filter** filters, size_t filter_count,
const grpc_core::ChannelArgs& channel_args, const char* name,
grpc_channel_stack* stack) {
grpc_channel_stack* stack, const grpc_core::Blackboard* old_blackboard,
grpc_core::Blackboard* new_blackboard) {
if (GRPC_TRACE_FLAG_ENABLED(channel_stack)) {
LOG(INFO) << "CHANNEL_STACK: init " << name;
for (size_t i = 0; i < filter_count; i++) {
@ -145,6 +146,8 @@ grpc_error_handle grpc_channel_stack_init(
sizeof(grpc_channel_element));
// init per-filter data
args.old_blackboard = old_blackboard;
args.new_blackboard = new_blackboard;
grpc_error_handle first_error;
for (i = 0; i < filter_count; i++) {
args.channel_stack = stack;

@ -19,13 +19,6 @@
#ifndef GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
#define GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
//////////////////////////////////////////////////////////////////////////////
// IMPORTANT NOTE:
//
// When you update this API, please make the corresponding changes to
// the C++ API in src/cpp/common/channel_filter.{h,cc}
//////////////////////////////////////////////////////////////////////////////
// A channel filter defines how operations on a channel are implemented.
// Channel filters are chained together to create full channels, and if those
// chains are linear, then channel stacks provide a mechanism to minimize
@ -74,11 +67,17 @@
#include "src/core/util/time_precise.h"
#include "src/core/util/unique_type_name.h"
namespace grpc_core {
class Blackboard;
} // namespace grpc_core
struct grpc_channel_element_args {
grpc_channel_stack* channel_stack;
grpc_core::ChannelArgs channel_args;
int is_first;
int is_last;
const grpc_core::Blackboard* old_blackboard;
grpc_core::Blackboard* new_blackboard;
};
struct grpc_call_element_args {
grpc_call_stack* call_stack;
@ -262,7 +261,9 @@ grpc_error_handle grpc_channel_stack_init(
int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
const grpc_channel_filter** filters, size_t filter_count,
const grpc_core::ChannelArgs& args, const char* name,
grpc_channel_stack* stack);
grpc_channel_stack* stack,
const grpc_core::Blackboard* old_blackboard = nullptr,
grpc_core::Blackboard* new_blackboard = nullptr);
// Destroy a channel stack
void grpc_channel_stack_destroy(grpc_channel_stack* stack);

@ -77,7 +77,7 @@ ChannelStackBuilderImpl::Build() {
gpr_free(stk);
},
channel_stack, stack.data(), stack.size(), channel_args(), name(),
channel_stack);
channel_stack, old_blackboard_, new_blackboard_);
if (!error.ok()) {
grpc_channel_stack_destroy(channel_stack);

@ -25,6 +25,8 @@
namespace grpc_core {
class Blackboard;
// Build a channel stack.
// Allows interested parties to add filters to the stack, and to query an
// in-progress build.
@ -34,13 +36,24 @@ class ChannelStackBuilderImpl final : public ChannelStackBuilder {
public:
using ChannelStackBuilder::ChannelStackBuilder;
void SetBlackboards(const Blackboard* old_blackboard,
Blackboard* new_blackboard) {
old_blackboard_ = old_blackboard;
new_blackboard_ = new_blackboard;
}
// Build the channel stack.
// After success, *result holds the new channel stack,
// prefix_bytes are allocated before the channel stack,
// initial_refs, destroy, destroy_arg are as per grpc_channel_stack_init
// On failure, *result is nullptr.
absl::StatusOr<RefCountedPtr<grpc_channel_stack>> Build() override;
private:
const Blackboard* old_blackboard_ = nullptr;
Blackboard* new_blackboard_ = nullptr;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_IMPL_H

@ -42,6 +42,7 @@
#include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
#include "src/core/filter/blackboard.h"
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
@ -81,15 +82,23 @@ class ChannelFilter {
public:
Args() : Args(nullptr, nullptr) {}
Args(grpc_channel_stack* channel_stack,
grpc_channel_element* channel_element)
: impl_(ChannelStackBased{channel_stack, channel_element}) {}
grpc_channel_element* channel_element,
const Blackboard* old_blackboard = nullptr,
Blackboard* new_blackboard = nullptr)
: impl_(ChannelStackBased{channel_stack, channel_element}),
old_blackboard_(old_blackboard),
new_blackboard_(new_blackboard) {}
// While we're moving to call-v3 we need to have access to
// grpc_channel_stack & friends here. That means that we can't rely on this
// type signature from interception_chain.h, which means that we need a way
// of constructing this object without naming it ===> implicit construction.
// TODO(ctiller): remove this once we're fully on call-v3
// NOLINTNEXTLINE(google-explicit-constructor)
Args(size_t instance_id) : impl_(V3Based{instance_id}) {}
Args(size_t instance_id, const Blackboard* old_blackboard = nullptr,
Blackboard* new_blackboard = nullptr)
: impl_(V3Based{instance_id}),
old_blackboard_(old_blackboard),
new_blackboard_(new_blackboard) {}
ABSL_DEPRECATED("Direct access to channel stack is deprecated")
grpc_channel_stack* channel_stack() const {
@ -113,6 +122,21 @@ class ChannelFilter {
[](const V3Based& v3) { return v3.instance_id; });
}
// If a filter state object of type T exists for key from a previous
// filter stack, retains it for the new filter stack we're constructing.
// Otherwise, invokes create_func() to create a new filter state
// object for the new filter stack. Returns the new filter state object.
template <typename T>
RefCountedPtr<T> GetOrCreateState(
const std::string& key,
absl::FunctionRef<RefCountedPtr<T>()> create_func) {
RefCountedPtr<T> state;
if (old_blackboard_ != nullptr) state = old_blackboard_->Get<T>(key);
if (state == nullptr) state = create_func();
if (new_blackboard_ != nullptr) new_blackboard_->Set(key, state);
return state;
}
private:
friend class ChannelFilter;
@ -127,6 +151,9 @@ class ChannelFilter {
using Impl = absl::variant<ChannelStackBased, V3Based>;
Impl impl_;
const Blackboard* old_blackboard_ = nullptr;
Blackboard* new_blackboard_ = nullptr;
};
// Perform post-initialization step (if any).
@ -1620,8 +1647,10 @@ struct ChannelFilterWithFlagsMethods {
static absl::Status InitChannelElem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
CHECK(args->is_last == ((kFlags & kFilterIsLast) != 0));
auto status = F::Create(args->channel_args,
ChannelFilter::Args(args->channel_stack, elem));
auto status = F::Create(
args->channel_args,
ChannelFilter::Args(args->channel_stack, elem, args->old_blackboard,
args->new_blackboard));
if (!status.ok()) {
new (elem->channel_data) F*(nullptr);
return absl_status_to_grpc_error(status.status());

@ -28,6 +28,7 @@
namespace grpc_core {
class Blackboard;
class InterceptionChainBuilder;
// One hijacked call. Using this we can get access to the CallHandler for the
@ -153,8 +154,12 @@ class InterceptionChainBuilder final {
absl::variant<RefCountedPtr<UnstartedCallDestination>,
RefCountedPtr<CallDestination>>;
explicit InterceptionChainBuilder(ChannelArgs args)
: args_(std::move(args)) {}
explicit InterceptionChainBuilder(ChannelArgs args,
const Blackboard* old_blackboard = nullptr,
Blackboard* new_blackboard = nullptr)
: args_(std::move(args)),
old_blackboard_(old_blackboard),
new_blackboard_(new_blackboard) {}
// Add a filter with a `Call` class as an inner member.
// Call class must be one compatible with the filters described in
@ -163,7 +168,8 @@ class InterceptionChainBuilder final {
absl::enable_if_t<sizeof(typename T::Call) != 0, InterceptionChainBuilder&>
Add() {
if (!status_.ok()) return *this;
auto filter = T::Create(args_, {FilterInstanceId(FilterTypeId<T>())});
auto filter = T::Create(args_, {FilterInstanceId(FilterTypeId<T>()),
old_blackboard_, new_blackboard_});
if (!filter.ok()) {
status_ = filter.status();
return *this;
@ -179,7 +185,8 @@ class InterceptionChainBuilder final {
absl::enable_if_t<std::is_base_of<Interceptor, T>::value,
InterceptionChainBuilder&>
Add() {
AddInterceptor(T::Create(args_, {FilterInstanceId(FilterTypeId<T>())}));
AddInterceptor(T::Create(args_, {FilterInstanceId(FilterTypeId<T>()),
old_blackboard_, new_blackboard_}));
return *this;
};
@ -237,6 +244,8 @@ class InterceptionChainBuilder final {
absl::Status status_;
std::map<size_t, size_t> filter_type_counts_;
static std::atomic<size_t> next_filter_id_;
const Blackboard* old_blackboard_ = nullptr;
Blackboard* new_blackboard_ = nullptr;
};
} // namespace grpc_core

@ -47,6 +47,11 @@ class LruCache {
// to be too large, removes the least recently used entry.
Value GetOrInsert(Key key, absl::AnyInvocable<Value(const Key&)> create);
// Changes the max size of the cache. If there are currently more than
// max_size entries, deletes least-recently-used entries to enforce
// the new max size.
void SetMaxSize(size_t max_size);
private:
struct CacheEntry {
Value value;
@ -55,7 +60,9 @@ class LruCache {
explicit CacheEntry(Value v) : value(std::move(v)) {}
};
const size_t max_size_;
void RemoveOldestEntry();
size_t max_size_;
absl::flat_hash_map<Key, CacheEntry> cache_;
std::list<Key> lru_list_;
};
@ -82,14 +89,7 @@ Value LruCache<Key, Value>::GetOrInsert(
if (value.has_value()) return std::move(*value);
// Entry not found. We'll need to insert a new entry.
// If the cache is at max size, remove the least recently used entry.
if (cache_.size() == max_size_) {
auto lru_it = lru_list_.begin();
CHECK(lru_it != lru_list_.end());
auto cache_it = cache_.find(*lru_it);
CHECK(cache_it != cache_.end());
cache_.erase(cache_it);
lru_list_.pop_front();
}
if (cache_.size() == max_size_) RemoveOldestEntry();
// Create a new entry, insert it, and return it.
auto it = cache_
.emplace(std::piecewise_construct, std::forward_as_tuple(key),
@ -99,6 +99,24 @@ Value LruCache<Key, Value>::GetOrInsert(
return it->second.value;
}
template <typename Key, typename Value>
void LruCache<Key, Value>::SetMaxSize(size_t max_size) {
max_size_ = max_size;
while (cache_.size() > max_size_) {
RemoveOldestEntry();
}
}
template <typename Key, typename Value>
void LruCache<Key, Value>::RemoveOldestEntry() {
auto lru_it = lru_list_.begin();
CHECK(lru_it != lru_list_.end());
auto cache_it = cache_.find(*lru_it);
CHECK(cache_it != cache_.end());
cache_.erase(cache_it);
lru_list_.pop_front();
}
} // namespace grpc_core
#endif // GRPC_SRC_CORE_UTIL_LRU_CACHE_H

@ -406,6 +406,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/upbdefs-gen/xds/type/v3/cel.upbdefs.c',
'src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.c',
'src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c',
'src/core/filter/blackboard.cc',
'src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc',
'src/core/handshaker/handshaker.cc',
'src/core/handshaker/handshaker_registry.cc',

@ -150,3 +150,16 @@ grpc_cc_benchmark(
"//test/core/transport:call_spine_benchmarks",
],
)
grpc_cc_test(
name = "blackboard_test",
srcs = ["blackboard_test.cc"],
external_deps = [
"gtest",
],
uses_event_engine = False,
uses_polling = False,
deps = [
"//src/core:blackboard",
],
)

@ -0,0 +1,66 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/filter/blackboard.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
namespace grpc_core {
namespace {
class FooEntry : public Blackboard::Entry {
public:
static UniqueTypeName Type() {
static UniqueTypeName::Factory kFactory("FooEntry");
return kFactory.Create();
}
};
class BarEntry : public Blackboard::Entry {
public:
static UniqueTypeName Type() {
static UniqueTypeName::Factory kFactory("BarEntry");
return kFactory.Create();
}
};
TEST(Blackboard, Basic) {
Blackboard blackboard;
// No entry for type FooEntry key "foo".
EXPECT_EQ(blackboard.Get<FooEntry>("a"), nullptr);
// Set entry for type FooEntry key "foo".
auto foo_entry = MakeRefCounted<FooEntry>();
blackboard.Set("a", foo_entry);
// Get the entry we just added.
EXPECT_EQ(blackboard.Get<FooEntry>("a"), foo_entry);
// A different key for the same type is still unset.
EXPECT_EQ(blackboard.Get<FooEntry>("b"), nullptr);
// The same key for a different type is still unset.
EXPECT_EQ(blackboard.Get<BarEntry>("a"), nullptr);
// Set entry for type BarEntry key "foo".
auto bar_entry = MakeRefCounted<BarEntry>();
blackboard.Set("a", bar_entry);
EXPECT_EQ(blackboard.Get<BarEntry>("a"), bar_entry);
// This should not have replaced the same key for FooEntry.
EXPECT_EQ(blackboard.Get<FooEntry>("a"), foo_entry);
}
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -66,6 +66,39 @@ TEST(LruCache, Basic) {
EXPECT_THAT(created_list, ::testing::ElementsAreArray(kOrder2));
}
TEST(LruCache, SetMaxSize) {
auto create = [&](const std::string& key) {
int value;
CHECK(absl::SimpleAtoi(key, &value));
return value;
};
// Create a cache with max size 10.
LruCache<std::string, int> cache(10);
// Insert 10 values.
for (int i = 1; i <= 10; ++i) {
std::string key = absl::StrCat(i);
EXPECT_EQ(absl::nullopt, cache.Get(key));
EXPECT_EQ(i, cache.GetOrInsert(key, create));
EXPECT_EQ(i, cache.Get(key));
}
// Set max size to 15. All elements should still be present.
cache.SetMaxSize(15);
for (int i = 1; i <= 10; ++i) {
std::string key = absl::StrCat(i);
EXPECT_EQ(i, cache.Get(key));
}
// Set max size to 6. This should remove the first 4 elements.
cache.SetMaxSize(6);
for (int i = 1; i <= 4; ++i) {
std::string key = absl::StrCat(i);
EXPECT_EQ(absl::nullopt, cache.Get(key)) << i;
}
for (int i = 5; i <= 10; ++i) {
std::string key = absl::StrCat(i);
EXPECT_EQ(i, cache.Get(key));
}
}
} // namespace grpc_core
int main(int argc, char** argv) {

@ -50,6 +50,7 @@ class XdsGcpAuthnEnd2endTest : public XdsEnd2endTest {
void SetUp() override {
g_audience = "";
g_token = nullptr;
g_num_token_fetches = 0;
grpc_core::HttpRequest::SetOverride(HttpGetOverride, nullptr, nullptr);
InitClient(MakeBootstrapBuilder(), /*lb_expected_authority=*/"",
/*xds_resource_does_not_exist_timeout_ms=*/0,
@ -83,6 +84,7 @@ class XdsGcpAuthnEnd2endTest : public XdsEnd2endTest {
"/computeMetadata/v1/instance/service-accounts/default/identity") {
return 0;
}
g_num_token_fetches.fetch_add(1);
// Validate request.
ValidateHttpRequest(request, uri);
// Generate response.
@ -126,10 +128,12 @@ class XdsGcpAuthnEnd2endTest : public XdsEnd2endTest {
static absl::string_view g_audience;
static const char* g_token;
static std::atomic<size_t> g_num_token_fetches;
};
absl::string_view XdsGcpAuthnEnd2endTest::g_audience;
const char* XdsGcpAuthnEnd2endTest::g_token;
std::atomic<size_t> XdsGcpAuthnEnd2endTest::g_num_token_fetches;
INSTANTIATE_TEST_SUITE_P(XdsTest, XdsGcpAuthnEnd2endTest,
::testing::Values(XdsTestType()), &XdsTestType::Name);
@ -159,6 +163,7 @@ TEST_P(XdsGcpAuthnEnd2endTest, Basic) {
EXPECT_THAT(server_initial_metadata,
::testing::Contains(::testing::Pair(
"authorization", absl::StrCat("Bearer ", g_token))));
EXPECT_EQ(g_num_token_fetches.load(), 1);
}
TEST_P(XdsGcpAuthnEnd2endTest, NoOpWhenClusterHasNoAudience) {
@ -183,6 +188,70 @@ TEST_P(XdsGcpAuthnEnd2endTest, NoOpWhenClusterHasNoAudience) {
::testing::Not(::testing::Contains(::testing::Key("authorization"))));
}
TEST_P(XdsGcpAuthnEnd2endTest, CacheRetainedAcrossXdsUpdates) {
grpc_core::testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_GCP_AUTHENTICATION_FILTER");
// Construct auth token.
g_audience = kAudience;
std::string token = MakeToken(grpc_core::Timestamp::InfFuture());
g_token = token.c_str();
// Set xDS resources.
CreateAndStartBackends(1, /*xds_enabled=*/false,
CreateTlsServerCredentials());
SetListenerAndRouteConfiguration(balancer_.get(),
BuildListenerWithGcpAuthnFilter(),
default_route_config_);
balancer_->ads_service()->SetCdsResource(BuildClusterWithAudience(kAudience));
EdsResourceArgs args({{"locality0", {CreateEndpoint(0)}}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Send an RPC and check that it arrives with the right auth token.
std::multimap<std::string, std::string> server_initial_metadata;
Status status = SendRpc(RpcOptions().set_echo_metadata_initially(true),
/*response=*/nullptr, &server_initial_metadata);
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_THAT(server_initial_metadata,
::testing::Contains(::testing::Pair(
"authorization", absl::StrCat("Bearer ", g_token))));
EXPECT_EQ(g_num_token_fetches.load(), 1);
// Trigger update that changes the route config, thus causing the
// dynamic filters to be recreated.
// We insert a route that matches requests with the header "foo" and
// has a non-forwarding action, which will cause the client to fail RPCs
// that hit this route.
RouteConfiguration route_config = default_route_config_;
*route_config.mutable_virtual_hosts(0)->add_routes() =
route_config.virtual_hosts(0).routes(0);
auto* header_matcher = route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_match()
->add_headers();
header_matcher->set_name("foo");
header_matcher->set_present_match(true);
route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_non_forwarding_action();
SetListenerAndRouteConfiguration(
balancer_.get(), BuildListenerWithGcpAuthnFilter(), route_config);
// Send RPCs with the header "foo" and wait for them to start failing.
// When they do, we know that the client has seen the update.
SendRpcsUntil(
DEBUG_LOCATION,
[&](const RpcResult& result) {
if (result.status.ok()) return true;
EXPECT_EQ(StatusCode::UNAVAILABLE, result.status.error_code());
EXPECT_EQ("Matching route has inappropriate action",
result.status.error_message());
return false;
},
/*timeout_ms=*/15000, RpcOptions().set_metadata({{"foo", "bar"}}));
// Now send an RPC without the header, which will go through the new
// instance of the GCP auth filter.
CheckRpcSendOk(DEBUG_LOCATION);
// Make sure we didn't re-fetch the token.
EXPECT_EQ(g_num_token_fetches.load(), 1);
}
TEST_P(XdsGcpAuthnEnd2endTest, FilterIgnoredWhenEnvVarNotSet) {
// Construct auth token.
g_audience = kAudience;

@ -2081,6 +2081,8 @@ src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.c \
src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.h \
src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c \
src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.h \
src/core/filter/blackboard.cc \
src/core/filter/blackboard.h \
src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc \
src/core/handshaker/endpoint_info/endpoint_info_handshaker.h \
src/core/handshaker/handshaker.cc \

@ -1848,6 +1848,8 @@ src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.c \
src/core/ext/upbdefs-gen/xds/type/v3/range.upbdefs.h \
src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.c \
src/core/ext/upbdefs-gen/xds/type/v3/typed_struct.upbdefs.h \
src/core/filter/blackboard.cc \
src/core/filter/blackboard.h \
src/core/handshaker/endpoint_info/endpoint_info_handshaker.cc \
src/core/handshaker/endpoint_info/endpoint_info_handshaker.h \
src/core/handshaker/handshaker.cc \

@ -1161,6 +1161,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "blackboard_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save