diff --git a/BUILD b/BUILD
index a0159cd8a3e..06e33b48d18 100644
--- a/BUILD
+++ b/BUILD
@@ -1934,6 +1934,7 @@ grpc_cc_library(
"src/core/lib/backoff/backoff.h",
"src/core/lib/channel/call_tracer.h",
"src/core/lib/channel/channel_stack.h",
+ "src/core/lib/channel/promise_based_filter.h",
"src/core/lib/channel/channel_stack_builder.h",
"src/core/lib/channel/channel_trace.h",
"src/core/lib/channel/channelz.h",
@@ -2086,6 +2087,7 @@ grpc_cc_library(
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [
"arena",
+ "arena_promise",
"avl",
"bitset",
"channel_args",
@@ -2108,6 +2110,7 @@ grpc_cc_library(
"json",
"memory_quota",
"orphanable",
+ "promise",
"ref_counted",
"ref_counted_ptr",
"resolved_address",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 320e209fedd..efd344d89a7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -820,6 +820,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx channelz_test)
add_dependencies(buildtests_cxx chunked_vector_test)
add_dependencies(buildtests_cxx cli_call_test)
+ add_dependencies(buildtests_cxx client_authority_filter_test)
add_dependencies(buildtests_cxx client_callback_end2end_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx client_channel_stress_test)
@@ -9047,6 +9048,41 @@ target_link_libraries(cli_call_test
)
+endif()
+if(gRPC_BUILD_TESTS)
+
+add_executable(client_authority_filter_test
+ test/core/filters/client_authority_filter_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+target_include_directories(client_authority_filter_test
+ PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ ${_gRPC_RE2_INCLUDE_DIR}
+ ${_gRPC_SSL_INCLUDE_DIR}
+ ${_gRPC_UPB_GENERATED_DIR}
+ ${_gRPC_UPB_GRPC_GENERATED_DIR}
+ ${_gRPC_UPB_INCLUDE_DIR}
+ ${_gRPC_XXHASH_INCLUDE_DIR}
+ ${_gRPC_ZLIB_INCLUDE_DIR}
+ third_party/googletest/googletest/include
+ third_party/googletest/googletest
+ third_party/googletest/googlemock/include
+ third_party/googletest/googlemock
+ ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(client_authority_filter_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc
+)
+
+
endif()
if(gRPC_BUILD_TESTS)
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 48433be2e88..2494fe6b435 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -742,6 +742,7 @@ libs:
- src/core/lib/channel/handshaker.h
- src/core/lib/channel/handshaker_factory.h
- src/core/lib/channel/handshaker_registry.h
+ - src/core/lib/channel/promise_based_filter.h
- src/core/lib/channel/status_util.h
- src/core/lib/compression/compression_internal.h
- src/core/lib/compression/message_compress.h
@@ -848,6 +849,7 @@ libs:
- src/core/lib/json/json_util.h
- src/core/lib/matchers/matchers.h
- src/core/lib/promise/activity.h
+ - src/core/lib/promise/arena_promise.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
@@ -858,6 +860,7 @@ libs:
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/poll.h
+ - src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/resolver/resolver.h
@@ -1851,6 +1854,7 @@ libs:
- src/core/lib/channel/handshaker.h
- src/core/lib/channel/handshaker_factory.h
- src/core/lib/channel/handshaker_registry.h
+ - src/core/lib/channel/promise_based_filter.h
- src/core/lib/channel/status_util.h
- src/core/lib/compression/compression_internal.h
- src/core/lib/compression/message_compress.h
@@ -1956,6 +1960,7 @@ libs:
- src/core/lib/json/json.h
- src/core/lib/json/json_util.h
- src/core/lib/promise/activity.h
+ - src/core/lib/promise/arena_promise.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
@@ -1966,6 +1971,7 @@ libs:
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/poll.h
+ - src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/resolver/resolver.h
@@ -5057,6 +5063,16 @@ targets:
- test/cpp/util/service_describer.cc
deps:
- grpc++_test_util
+- name: client_authority_filter_test
+ gtest: true
+ build: test
+ language: c++
+ headers: []
+ src:
+ - test/core/filters/client_authority_filter_test.cc
+ deps:
+ - grpc
+ uses_polling: false
- name: client_callback_end2end_test
gtest: true
build: test
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 08b706e5d31..570295f17fb 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -587,6 +587,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/handshaker.h',
'src/core/lib/channel/handshaker_factory.h',
'src/core/lib/channel/handshaker_registry.h',
+ 'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/status_util.h',
'src/core/lib/compression/compression_internal.h',
'src/core/lib/compression/message_compress.h',
@@ -721,6 +722,7 @@ Pod::Spec.new do |s|
'src/core/lib/matchers/matchers.h',
'src/core/lib/profiling/timers.h',
'src/core/lib/promise/activity.h',
+ 'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
@@ -731,6 +733,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/poll.h',
+ 'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/resolver/resolver.h',
@@ -1320,6 +1323,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/handshaker.h',
'src/core/lib/channel/handshaker_factory.h',
'src/core/lib/channel/handshaker_registry.h',
+ 'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/status_util.h',
'src/core/lib/compression/compression_internal.h',
'src/core/lib/compression/message_compress.h',
@@ -1454,6 +1458,7 @@ Pod::Spec.new do |s|
'src/core/lib/matchers/matchers.h',
'src/core/lib/profiling/timers.h',
'src/core/lib/promise/activity.h',
+ 'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
@@ -1464,6 +1469,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/poll.h',
+ 'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/resolver/resolver.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 4058ada3151..3e0c2daccf3 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -880,6 +880,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/handshaker_factory.h',
'src/core/lib/channel/handshaker_registry.cc',
'src/core/lib/channel/handshaker_registry.h',
+ 'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/status_util.cc',
'src/core/lib/channel/status_util.h',
'src/core/lib/compression/compression.cc',
@@ -1174,6 +1175,7 @@ Pod::Spec.new do |s|
'src/core/lib/profiling/timers.h',
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/activity.h',
+ 'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
@@ -1184,6 +1186,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/poll.h',
+ 'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/resolver/resolver.cc',
@@ -1862,6 +1865,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/handshaker.h',
'src/core/lib/channel/handshaker_factory.h',
'src/core/lib/channel/handshaker_registry.h',
+ 'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/status_util.h',
'src/core/lib/compression/compression_internal.h',
'src/core/lib/compression/message_compress.h',
@@ -1996,6 +2000,7 @@ Pod::Spec.new do |s|
'src/core/lib/matchers/matchers.h',
'src/core/lib/profiling/timers.h',
'src/core/lib/promise/activity.h',
+ 'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
@@ -2006,6 +2011,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/poll.h',
+ 'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/resolver/resolver.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 59eeb725d4a..488450b4779 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -799,6 +799,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/handshaker_factory.h )
s.files += %w( src/core/lib/channel/handshaker_registry.cc )
s.files += %w( src/core/lib/channel/handshaker_registry.h )
+ s.files += %w( src/core/lib/channel/promise_based_filter.h )
s.files += %w( src/core/lib/channel/status_util.cc )
s.files += %w( src/core/lib/channel/status_util.h )
s.files += %w( src/core/lib/compression/compression.cc )
@@ -1093,6 +1094,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/profiling/timers.h )
s.files += %w( src/core/lib/promise/activity.cc )
s.files += %w( src/core/lib/promise/activity.h )
+ s.files += %w( src/core/lib/promise/arena_promise.h )
s.files += %w( src/core/lib/promise/context.h )
s.files += %w( src/core/lib/promise/detail/basic_seq.h )
s.files += %w( src/core/lib/promise/detail/promise_factory.h )
@@ -1103,6 +1105,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/loop.h )
s.files += %w( src/core/lib/promise/map.h )
s.files += %w( src/core/lib/promise/poll.h )
+ s.files += %w( src/core/lib/promise/promise.h )
s.files += %w( src/core/lib/promise/race.h )
s.files += %w( src/core/lib/promise/seq.h )
s.files += %w( src/core/lib/resolver/resolver.cc )
diff --git a/package.xml b/package.xml
index 1cd14b1714e..5568b43aa3c 100644
--- a/package.xml
+++ b/package.xml
@@ -779,6 +779,7 @@
+
@@ -1073,6 +1074,7 @@
+
@@ -1083,6 +1085,7 @@
+
diff --git a/src/core/ext/filters/http/client_authority_filter.cc b/src/core/ext/filters/http/client_authority_filter.cc
index e46765009cf..9540944683d 100644
--- a/src/core/ext/filters/http/client_authority_filter.cc
+++ b/src/core/ext/filters/http/client_authority_filter.cc
@@ -28,7 +28,7 @@
#include
#include
-#include "src/core/lib/channel/channel_args.h"
+#include "src/core/ext/filters/http/client_authority_filter.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/string.h"
@@ -37,87 +37,43 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel_stack_type.h"
-namespace {
-
-struct call_data {
- grpc_core::CallCombiner* call_combiner;
-};
-
-struct channel_data {
- grpc_core::Slice default_authority;
-};
-
-void client_authority_start_transport_stream_op_batch(
- grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
- channel_data* chand = static_cast(elem->channel_data);
- // Handle send_initial_metadata.
- // If the initial metadata doesn't already contain :authority, add it.
- if (batch->send_initial_metadata &&
- batch->payload->send_initial_metadata.send_initial_metadata->get_pointer(
- grpc_core::HttpAuthorityMetadata()) == nullptr) {
- batch->payload->send_initial_metadata.send_initial_metadata->Set(
- grpc_core::HttpAuthorityMetadata(), chand->default_authority.Ref());
- }
- // Pass control down the stack.
- grpc_call_next_op(elem, batch);
-}
-
-/* Constructor for call_data */
-grpc_error_handle client_authority_init_call_elem(
- grpc_call_element* elem, const grpc_call_element_args* args) {
- call_data* calld = static_cast(elem->call_data);
- calld->call_combiner = args->call_combiner;
- return GRPC_ERROR_NONE;
-}
-
-/* Destructor for call_data */
-void client_authority_destroy_call_elem(
- grpc_call_element* /*elem*/, const grpc_call_final_info* /*final_info*/,
- grpc_closure* /*ignored*/) {}
+namespace grpc_core {
-/* Constructor for channel_data */
-grpc_error_handle client_authority_init_channel_elem(
- grpc_channel_element* elem, grpc_channel_element_args* args) {
- channel_data* chand = new (elem->channel_data) channel_data;
+absl::StatusOr ClientAuthorityFilter::Create(
+ const grpc_channel_args* args) {
const grpc_arg* default_authority_arg =
- grpc_channel_args_find(args->channel_args, GRPC_ARG_DEFAULT_AUTHORITY);
+ grpc_channel_args_find(args, GRPC_ARG_DEFAULT_AUTHORITY);
if (default_authority_arg == nullptr) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ return absl::InvalidArgumentError(
"GRPC_ARG_DEFAULT_AUTHORITY channel arg. not found. Note that direct "
"channels must explicitly specify a value for this argument.");
}
const char* default_authority_str =
grpc_channel_arg_get_string(default_authority_arg);
if (default_authority_str == nullptr) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ return absl::InvalidArgumentError(
"GRPC_ARG_DEFAULT_AUTHORITY channel arg. must be a string");
}
- chand->default_authority =
- grpc_core::Slice::FromCopiedString(default_authority_str);
- GPR_ASSERT(!args->is_last);
- return GRPC_ERROR_NONE;
+ return ClientAuthorityFilter(Slice::FromCopiedString(default_authority_str));
}
-/* Destructor for channel data */
-void client_authority_destroy_channel_elem(grpc_channel_element* elem) {
- static_cast(elem->channel_data)->~channel_data();
+ArenaPromise ClientAuthorityFilter::MakeCallPromise(
+ ClientInitialMetadata initial_metadata,
+ NextPromiseFactory next_promise_factory) {
+ // If no authority is set, set the default authority.
+ if (initial_metadata->get_pointer(HttpAuthorityMetadata()) == nullptr) {
+ initial_metadata->Set(HttpAuthorityMetadata(), default_authority_.Ref());
+ }
+ // We have no asynchronous work, so we can just ask the next promise to run,
+ // passing down initial_metadata.
+ return next_promise_factory(std::move(initial_metadata));
}
-} // namespace
-const grpc_channel_filter grpc_client_authority_filter = {
- client_authority_start_transport_stream_op_batch,
- grpc_channel_next_op,
- sizeof(call_data),
- client_authority_init_call_elem,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- client_authority_destroy_call_elem,
- sizeof(channel_data),
- client_authority_init_channel_elem,
- client_authority_destroy_channel_elem,
- grpc_channel_next_get_info,
- "authority"};
+namespace {
+const grpc_channel_filter grpc_client_authority_filter =
+ MakePromiseBasedFilter();
-static bool add_client_authority_filter(grpc_channel_stack_builder* builder) {
+bool add_client_authority_filter(grpc_channel_stack_builder* builder) {
const grpc_channel_args* channel_args =
grpc_channel_stack_builder_get_channel_arguments(builder);
const grpc_arg* disable_client_authority_filter_arg = grpc_channel_args_find(
@@ -132,12 +88,13 @@ static bool add_client_authority_filter(grpc_channel_stack_builder* builder) {
return grpc_channel_stack_builder_prepend_filter(
builder, &grpc_client_authority_filter, nullptr, nullptr);
}
+} // namespace
-namespace grpc_core {
void RegisterClientAuthorityFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
add_client_authority_filter);
builder->channel_init()->RegisterStage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
add_client_authority_filter);
}
+
} // namespace grpc_core
diff --git a/src/core/ext/filters/http/client_authority_filter.h b/src/core/ext/filters/http/client_authority_filter.h
index 5824e91ff21..3486654dcee 100644
--- a/src/core/ext/filters/http/client_authority_filter.h
+++ b/src/core/ext/filters/http/client_authority_filter.h
@@ -21,14 +21,35 @@
#include
+#include "absl/status/statusor.h"
+
#include
-#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/promise_based_filter.h"
+#include "src/core/lib/slice/slice.h"
+
+namespace grpc_core {
+
+class ClientAuthorityFilter {
+ public:
+ static absl::StatusOr Create(
+ const grpc_channel_args* args);
+
+ static constexpr bool is_client() { return true; }
+ static constexpr const char* name() { return "authority"; }
+
+ // Construct a promise for one call.
+ ArenaPromise MakeCallPromise(
+ ClientInitialMetadata initial_metadata,
+ NextPromiseFactory next_promise_factory);
-/// Filter responsible for setting the authority header, if not already set. It
-/// uses the value of the GRPC_ARG_DEFAULT_AUTHORITY channel arg if the initial
-/// metadata doesn't already contain an authority value.
+ private:
+ explicit ClientAuthorityFilter(Slice default_authority)
+ : default_authority_(std::move(default_authority)) {}
+ Slice default_authority_;
+};
-extern const grpc_channel_filter grpc_client_authority_filter;
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_HTTP_CLIENT_AUTHORITY_FILTER_H */
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 04df03f636c..df40ad3b7c6 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1353,8 +1353,10 @@ static void perform_stream_op_locked(void* stream_op,
s->context = op->payload->context;
s->traced = op->is_traced;
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
- gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p",
- grpc_transport_stream_op_batch_string(op).c_str(), op->on_complete);
+ gpr_log(GPR_INFO,
+ "perform_stream_op_locked[s=%p; op=%p]: %s; on_complete = %p", s,
+ op, grpc_transport_stream_op_batch_string(op).c_str(),
+ op->on_complete);
if (op->send_initial_metadata) {
log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
s->id, t->is_client, true);
@@ -1598,7 +1600,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
- gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s,
+ gpr_log(GPR_INFO, "perform_stream_op[s=%p; op=%p]: %s", s, op,
grpc_transport_stream_op_batch_string(op).c_str());
}
diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h
new file mode 100644
index 00000000000..d1bee3ebf68
--- /dev/null
+++ b/src/core/lib/channel/promise_based_filter.h
@@ -0,0 +1,497 @@
+// Copyright 2022 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef GRPC_CORE_LIB_CHANNEL_PROMISE_BASED_FILTER_H
+#define GRPC_CORE_LIB_CHANNEL_PROMISE_BASED_FILTER_H
+
+// Scaffolding to allow the per-call part of a filter to be authored in a
+// promise-style. Most of this will be removed once the promises conversion is
+// completed.
+
+#include
+
+#include "absl/utility/utility.h"
+
+#include
+#include
+
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/promise/arena_promise.h"
+#include "src/core/lib/promise/promise.h"
+#include "src/core/lib/transport/error_utils.h"
+
+namespace grpc_core {
+
+namespace promise_filter_detail {
+class BaseCallData;
+};
+
+// Small unowned "handle" type to ensure one accessor at a time to metadata.
+// The focus here is to get promises to use the syntax we'd like - we'll
+// probably substitute some other smart pointer later.
+template
+class MetadataHandle {
+ public:
+ MetadataHandle() = default;
+
+ MetadataHandle(const MetadataHandle&) = delete;
+ MetadataHandle& operator=(const MetadataHandle&) = delete;
+
+ MetadataHandle(MetadataHandle&& other) noexcept : handle_(other.handle_) {
+ other.handle_ = nullptr;
+ }
+ MetadataHandle& operator=(MetadataHandle&& other) noexcept {
+ handle_ = other.handle_;
+ other.handle_ = nullptr;
+ return *this;
+ }
+
+ T* operator->() const { return handle_; }
+ bool has_value() const { return handle_ != nullptr; }
+
+ static MetadataHandle TestOnlyWrap(T* p) { return MetadataHandle(p); }
+
+ private:
+ friend class promise_filter_detail::BaseCallData;
+
+ explicit MetadataHandle(T* handle) : handle_(handle) {}
+ T* Unwrap() {
+ T* result = handle_;
+ handle_ = nullptr;
+ return result;
+ }
+
+ T* handle_ = nullptr;
+};
+
+// Trailing metadata type
+// TODO(ctiller): This should be a bespoke instance of MetadataMap<>
+using TrailingMetadata = MetadataHandle;
+
+// Client initial metadata type
+// TODO(ctiller): This should be a bespoke instance of MetadataMap<>
+using ClientInitialMetadata = MetadataHandle;
+
+using NextPromiseFactory =
+ std::function(ClientInitialMetadata)>;
+
+namespace promise_filter_detail {
+
+// Call data shared between all implementations of promise-based filters.
+class BaseCallData {
+ public:
+ BaseCallData(grpc_call_element* elem, const grpc_call_element_args* args)
+ : elem_(elem),
+ arena_(args->arena),
+ call_combiner_(args->call_combiner),
+ deadline_(args->deadline) {}
+
+ protected:
+ class ScopedContext : public promise_detail::Context {
+ public:
+ explicit ScopedContext(BaseCallData* call_data)
+ : promise_detail::Context(call_data->arena_) {}
+ };
+
+ static MetadataHandle WrapMetadata(
+ grpc_metadata_batch* p) {
+ return MetadataHandle(p);
+ }
+
+ static grpc_metadata_batch* UnwrapMetadata(
+ MetadataHandle p) {
+ return p.Unwrap();
+ }
+
+ grpc_call_element* elem() const { return elem_; }
+ CallCombiner* call_combiner() const { return call_combiner_; }
+ grpc_millis deadline() const { return deadline_; }
+
+ private:
+ grpc_call_element* const elem_;
+ Arena* const arena_;
+ CallCombiner* const call_combiner_;
+ const grpc_millis deadline_;
+};
+
+// Specific call data per channel filter.
+// Note that we further specialize for clients and servers since their
+// implementations are very different.
+template
+class CallData;
+
+// Client implementation of call data.
+template
+class CallData : public BaseCallData {
+ public:
+ CallData(grpc_call_element* elem, const grpc_call_element_args* args)
+ : BaseCallData(elem, args) {
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
+ RecvTrailingMetadataReadyCallback, this,
+ grpc_schedule_on_exec_ctx);
+ }
+
+ ~CallData() {
+ GPR_ASSERT(!is_polling_);
+ GRPC_ERROR_UNREF(cancelled_error_);
+ }
+
+ // Handle one grpc_transport_stream_op_batch
+ void StartBatch(grpc_transport_stream_op_batch* batch) {
+ // Fake out the activity based context.
+ ScopedContext context(this);
+
+ // If this is a cancel stream, cancel anything we have pending and propagate
+ // the cancellation.
+ if (batch->cancel_stream) {
+ GPR_ASSERT(!batch->send_initial_metadata &&
+ !batch->send_trailing_metadata && !batch->send_message &&
+ !batch->recv_initial_metadata && !batch->recv_message &&
+ !batch->recv_trailing_metadata);
+ Cancel(batch->payload->cancel_stream.cancel_error);
+ grpc_call_next_op(elem(), batch);
+ return;
+ }
+
+ // send_initial_metadata: seeing this triggers the start of the promise part
+ // of this filter.
+ if (batch->send_initial_metadata) {
+ // If we're already cancelled, just terminate the batch.
+ if (send_initial_state_ == SendInitialState::kCancelled) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ batch, GRPC_ERROR_REF(cancelled_error_), call_combiner());
+ return;
+ }
+ // Otherwise, we should not have seen a send_initial_metadata op yet.
+ GPR_ASSERT(send_initial_state_ == SendInitialState::kInitial);
+ // Mark ourselves as queued.
+ send_initial_state_ = SendInitialState::kQueued;
+ if (batch->recv_trailing_metadata) {
+ // If there's a recv_trailing_metadata op, we queue that too.
+ GPR_ASSERT(recv_trailing_state_ == RecvTrailingState::kInitial);
+ recv_trailing_state_ = RecvTrailingState::kQueued;
+ }
+ // This is the queuing!
+ send_initial_metadata_batch_ = batch;
+ // And kick start the promise.
+ StartPromise();
+ return;
+ }
+
+ // recv_trailing_metadata *without* send_initial_metadata: hook it so we can
+ // respond to it, and push it down.
+ if (batch->recv_trailing_metadata) {
+ GPR_ASSERT(recv_trailing_state_ == RecvTrailingState::kInitial);
+ recv_trailing_state_ = RecvTrailingState::kForwarded;
+ HookRecvTrailingMetadata(batch);
+ }
+
+ grpc_call_next_op(elem(), batch);
+ }
+
+ private:
+ // At what stage is our handling of send initial metadata?
+ enum class SendInitialState {
+ // Start state: no op seen
+ kInitial,
+ // We've seen the op, and started the promise in response to it, but have
+ // not yet sent the op to the next filter.
+ kQueued,
+ // We've sent the op to the next filter.
+ kForwarded,
+ // We were cancelled.
+ kCancelled
+ };
+ // At what stage is our handling of recv trailing metadata?
+ enum class RecvTrailingState {
+ // Start state: no op seen
+ kInitial,
+ // We saw the op, and since it was bundled with send initial metadata, we
+ // queued it until the send initial metadata can be sent to the next filter.
+ kQueued,
+ // We've forwarded the op to the next filter.
+ kForwarded,
+ // The op has completed from below, but we haven't yet forwarded it up (the
+ // promise gets to interject and mutate it).
+ kComplete,
+ // We've called the recv_metadata_ready callback from the original
+ // recv_trailing_metadata op that was presented to us.
+ kResponded,
+ // We've been cancelled and handled that locally.
+ // (i.e. whilst the recv_trailing_metadata op is queued in this filter).
+ kCancelled
+ };
+
+ // Handle cancellation.
+ void Cancel(grpc_error_handle error) {
+ // Track the latest reason for cancellation.
+ GRPC_ERROR_UNREF(cancelled_error_);
+ cancelled_error_ = GRPC_ERROR_REF(error);
+ // Stop running the promise.
+ promise_ = ArenaPromise();
+ // If we have an op queued, fail that op.
+ // Record what we've done.
+ if (send_initial_state_ == SendInitialState::kQueued) {
+ send_initial_state_ = SendInitialState::kCancelled;
+ if (recv_trailing_state_ == RecvTrailingState::kQueued) {
+ recv_trailing_state_ = RecvTrailingState::kCancelled;
+ }
+ grpc_transport_stream_op_batch_finish_with_failure(
+ absl::exchange(send_initial_metadata_batch_, nullptr),
+ GRPC_ERROR_REF(cancelled_error_), call_combiner());
+ } else {
+ send_initial_state_ = SendInitialState::kCancelled;
+ }
+ }
+
+ // Begin running the promise - which will ultimately take some initial
+ // metadata and return some trailing metadata.
+ void StartPromise() {
+ GPR_ASSERT(send_initial_state_ == SendInitialState::kQueued);
+ ChannelFilter* filter = static_cast(elem()->channel_data);
+
+ // Construct the promise.
+ promise_ = filter->MakeCallPromise(
+ WrapMetadata(send_initial_metadata_batch_->payload
+ ->send_initial_metadata.send_initial_metadata),
+ [this](ClientInitialMetadata initial_metadata) {
+ return MakeNextPromise(std::move(initial_metadata));
+ });
+ // Poll once.
+ WakeInsideCombiner();
+ }
+
+ // Interject our callback into the op batch for recv trailing metadata ready.
+ // Stash a pointer to the trailing metadata that will be filled in, so we can
+ // manipulate it later.
+ void HookRecvTrailingMetadata(grpc_transport_stream_op_batch* batch) {
+ recv_trailing_metadata_ =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ original_recv_trailing_metadata_ready_ =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &recv_trailing_metadata_ready_;
+ }
+
+ // Construct a promise that will "call" the next filter.
+ // Effectively:
+ // - put the modified initial metadata into the batch to be sent down.
+ // - return a wrapper around PollTrailingMetadata as the promise.
+ ArenaPromise MakeNextPromise(
+ ClientInitialMetadata initial_metadata) {
+ GPR_ASSERT(send_initial_state_ == SendInitialState::kQueued);
+ send_initial_metadata_batch_->payload->send_initial_metadata
+ .send_initial_metadata = UnwrapMetadata(std::move(initial_metadata));
+ return ArenaPromise(
+ [this]() { return PollTrailingMetadata(); });
+ }
+
+ // Wrapper to make it look like we're calling the next filter as a promise.
+ // First poll: send the send_initial_metadata op down the stack.
+ // All polls: await receiving the trailing metadata, then return it to the
+ // application.
+ Poll PollTrailingMetadata() {
+ if (send_initial_state_ == SendInitialState::kQueued) {
+ // First poll: pass the send_initial_metadata op down the stack.
+ GPR_ASSERT(send_initial_metadata_batch_ != nullptr);
+ send_initial_state_ = SendInitialState::kForwarded;
+ if (recv_trailing_state_ == RecvTrailingState::kQueued) {
+ // (and the recv_trailing_metadata op if it's part of the queuing)
+ HookRecvTrailingMetadata(send_initial_metadata_batch_);
+ recv_trailing_state_ = RecvTrailingState::kForwarded;
+ }
+ forward_send_initial_metadata_ = true;
+ }
+ switch (recv_trailing_state_) {
+ case RecvTrailingState::kInitial:
+ case RecvTrailingState::kQueued:
+ case RecvTrailingState::kForwarded:
+ // No trailing metadata yet: we are pending.
+ // We return that and expect the promise to be repolled later (if it's
+ // not cancelled).
+ return Pending{};
+ case RecvTrailingState::kComplete:
+ // We've received trailing metadata: pass it to the promise and allow it
+ // to adjust it.
+ return WrapMetadata(recv_trailing_metadata_);
+ case RecvTrailingState::kCancelled: {
+ // We've been cancelled: synthesize some trailing metadata and pass it
+ // to the calling promise for adjustment.
+ recv_trailing_metadata_->Clear();
+ SetStatusFromError(recv_trailing_metadata_, cancelled_error_);
+ return WrapMetadata(recv_trailing_metadata_);
+ }
+ case RecvTrailingState::kResponded:
+ // We've already responded to the caller: we can't do anything and we
+ // should never reach here.
+ abort();
+ }
+ GPR_UNREACHABLE_CODE(return Pending{});
+ }
+
+ static void RecvTrailingMetadataReadyCallback(void* arg,
+ grpc_error_handle error) {
+ static_cast(arg)->RecvTrailingMetadataReady(error);
+ }
+
+ void RecvTrailingMetadataReady(grpc_error_handle error) {
+ // If there was an error, we'll put that into the trailing metadata and
+ // proceed as if there was not.
+ if (error != GRPC_ERROR_NONE) {
+ SetStatusFromError(recv_trailing_metadata_, error);
+ }
+ // Record that we've got the callback.
+ GPR_ASSERT(recv_trailing_state_ == RecvTrailingState::kForwarded);
+ recv_trailing_state_ = RecvTrailingState::kComplete;
+ // Repoll the promise.
+ ScopedContext context(this);
+ WakeInsideCombiner();
+ }
+
+ // Given an error, fill in TrailingMetadata to represent that error.
+ void SetStatusFromError(grpc_metadata_batch* metadata,
+ grpc_error_handle error) {
+ grpc_status_code status_code = GRPC_STATUS_UNKNOWN;
+ std::string status_details;
+ grpc_error_get_status(error, deadline(), &status_code, &status_details,
+ nullptr, nullptr);
+ metadata->Set(GrpcStatusMetadata(), status_code);
+ metadata->Set(GrpcMessageMetadata(),
+ Slice::FromCopiedString(status_details));
+ }
+
+ // Wakeup and poll the promise if appropriate.
+ void WakeInsideCombiner() {
+ GPR_ASSERT(!is_polling_);
+ grpc_closure* call_closure = nullptr;
+ is_polling_ = true;
+ switch (send_initial_state_) {
+ case SendInitialState::kQueued:
+ case SendInitialState::kForwarded: {
+ // Poll the promise once since we're waiting for it.
+ Poll poll = promise_();
+ if (auto* r = absl::get_if(&poll)) {
+ GPR_ASSERT(recv_trailing_state_ == RecvTrailingState::kComplete);
+ GPR_ASSERT(recv_trailing_metadata_ == UnwrapMetadata(std::move(*r)));
+ recv_trailing_state_ = RecvTrailingState::kResponded;
+ call_closure =
+ absl::exchange(original_recv_trailing_metadata_ready_, nullptr);
+ }
+ } break;
+ case SendInitialState::kInitial:
+ case SendInitialState::kCancelled:
+ // If we get a response without sending anything, we just propagate that
+ // up. (note: that situation isn't possible once we finish the promise
+ // transition).
+ if (recv_trailing_state_ == RecvTrailingState::kComplete) {
+ recv_trailing_state_ = RecvTrailingState::kResponded;
+ call_closure =
+ absl::exchange(original_recv_trailing_metadata_ready_, nullptr);
+ }
+ break;
+ }
+ is_polling_ = false;
+ if (absl::exchange(forward_send_initial_metadata_, false)) {
+ grpc_call_next_op(elem(),
+ absl::exchange(send_initial_metadata_batch_, nullptr));
+ }
+ if (call_closure != nullptr) {
+ Closure::Run(DEBUG_LOCATION, call_closure, GRPC_ERROR_NONE);
+ }
+ }
+
+ // Contained promise
+ ArenaPromise promise_;
+ // Queued batch containing at least a send_initial_metadata op.
+ grpc_transport_stream_op_batch* send_initial_metadata_batch_ = nullptr;
+ // Pointer to where trailing metadata will be stored.
+ grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
+ // Closure to call when we're done with the trailing metadata.
+ grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
+ // Our closure pointing to RecvTrailingMetadataReadyCallback.
+ grpc_closure recv_trailing_metadata_ready_;
+ // Error received during cancellation.
+ grpc_error_handle cancelled_error_ = GRPC_ERROR_NONE;
+ // State of the send_initial_metadata op.
+ SendInitialState send_initial_state_ = SendInitialState::kInitial;
+ // State of the recv_trailing_metadata op.
+ RecvTrailingState recv_trailing_state_ = RecvTrailingState::kInitial;
+ // Whether we're currently polling the promise.
+ bool is_polling_ = false;
+ // Whether we should forward send initial metadata after polling?
+ bool forward_send_initial_metadata_ = false;
+};
+
+} // namespace promise_filter_detail
+
+// ChannelFilter contains the following:
+// class SomeChannelFilter {
+// public:
+// static constexpr bool is_client();
+// static constexpr const char* name();
+// static absl::StatusOr Create(
+// const grpc_channel_args* args);
+// ArenaPromise MakeCallPromise(
+// InitialMetadata* initial_metadata, NextPromiseFactory next_promise);
+// };
+// TODO(ctiller): allow implementing get_channel_info, start_transport_op in
+// some way on ChannelFilter.
+template
+grpc_channel_filter MakePromiseBasedFilter() {
+ using CallData = promise_filter_detail::CallData;
+
+ return grpc_channel_filter{
+ // start_transport_stream_op_batch
+ [](grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ static_cast(elem->call_data)->StartBatch(batch);
+ },
+ // start_transport_op - for now unsupported
+ grpc_channel_next_op,
+ // sizeof_call_data
+ sizeof(CallData),
+ // init_call_elem
+ [](grpc_call_element* elem, const grpc_call_element_args* args) {
+ new (elem->call_data) CallData(elem, args);
+ return GRPC_ERROR_NONE;
+ },
+ // set_pollset_or_pollset_set
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ // destroy_call_elem
+ [](grpc_call_element* elem, const grpc_call_final_info*, grpc_closure*) {
+ static_cast(elem->call_data)->~CallData();
+ },
+ // sizeof_channel_data
+ sizeof(ChannelFilter),
+ // init_channel_elem
+ [](grpc_channel_element* elem, grpc_channel_element_args* args) {
+ GPR_ASSERT(!args->is_last);
+ auto status = ChannelFilter::Create(args->channel_args);
+ if (!status.ok()) return absl_status_to_grpc_error(status.status());
+ new (elem->channel_data) ChannelFilter(std::move(*status));
+ return GRPC_ERROR_NONE;
+ },
+ // destroy_channel_elem
+ [](grpc_channel_element* elem) {
+ static_cast(elem->channel_data)->~ChannelFilter();
+ },
+ // get_channel_info
+ grpc_channel_next_get_info,
+ // name
+ ChannelFilter::name(),
+ };
+}
+
+} // namespace grpc_core
+
+#endif // GRPC_CORE_LIB_CHANNEL_PROMISE_BASED_FILTER_H
diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h
index 7786ccff5a5..ed7e480b796 100644
--- a/src/core/lib/transport/metadata_batch.h
+++ b/src/core/lib/transport/metadata_batch.h
@@ -25,6 +25,7 @@
#include
+#include "absl/strings/escaping.h"
#include "absl/strings/match.h"
#include "absl/strings/str_join.h"
#include "absl/types/optional.h"
@@ -909,6 +910,15 @@ class MetadataMap {
void Log(absl::FunctionRef log_fn)
const;
+ std::string DebugString() const {
+ std::string out;
+ Log([&out](absl::string_view key, absl::string_view value) {
+ if (!out.empty()) out.append(", ");
+ absl::StrAppend(&out, absl::CEscape(key), ": ", absl::CEscape(value));
+ });
+ return out;
+ }
+
// Get the pointer to the value of some known metadata.
// Returns nullptr if the metadata is not present.
// Causes a compilation error if Which is not an element of Traits.
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 8f61cc2f9f9..b0899e287a7 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -41,24 +41,14 @@
/* These routines are here to facilitate debugging - they produce string
representations of various transport data structures */
-static void put_metadata_list(const grpc_metadata_batch& md,
- std::vector* out) {
- bool first = true;
- md.Log([out, &first](absl::string_view key, absl::string_view value) {
- if (!first) out->push_back(", ");
- first = false;
- out->push_back(absl::StrCat(absl::CEscape(key), "=", absl::CEscape(value)));
- });
-}
-
std::string grpc_transport_stream_op_batch_string(
grpc_transport_stream_op_batch* op) {
std::vector out;
if (op->send_initial_metadata) {
out.push_back(" SEND_INITIAL_METADATA{");
- put_metadata_list(*op->payload->send_initial_metadata.send_initial_metadata,
- &out);
+ out.push_back(op->payload->send_initial_metadata.send_initial_metadata
+ ->DebugString());
out.push_back("}");
}
@@ -77,8 +67,8 @@ std::string grpc_transport_stream_op_batch_string(
if (op->send_trailing_metadata) {
out.push_back(" SEND_TRAILING_METADATA{");
- put_metadata_list(
- *op->payload->send_trailing_metadata.send_trailing_metadata, &out);
+ out.push_back(op->payload->send_trailing_metadata.send_trailing_metadata
+ ->DebugString());
out.push_back("}");
}
@@ -97,7 +87,9 @@ std::string grpc_transport_stream_op_batch_string(
if (op->cancel_stream) {
out.push_back(absl::StrCat(
" CANCEL:",
- grpc_error_std_string(op->payload->cancel_stream.cancel_error)));
+ absl::StrFormat(
+ "%p", static_cast(op->payload->cancel_stream.cancel_error)),
+ ":", grpc_error_std_string(op->payload->cancel_stream.cancel_error)));
}
return absl::StrJoin(out, "");
diff --git a/test/core/end2end/goaway_server_test.cc b/test/core/end2end/goaway_server_test.cc
index df41aa3ead0..d9afa350fc8 100644
--- a/test/core/end2end/goaway_server_test.cc
+++ b/test/core/end2end/goaway_server_test.cc
@@ -165,7 +165,8 @@ int main(int argc, char** argv) {
gpr_mu_init(&g_mu);
grpc_init();
g_default_dns_resolver = grpc_core::GetDNSResolver();
- grpc_core::SetDNSResolver(new TestDNSResolver());
+ auto* resolver = new TestDNSResolver();
+ grpc_core::SetDNSResolver(resolver);
iomgr_dns_lookup_ares = grpc_dns_lookup_ares;
iomgr_cancel_ares_request = grpc_cancel_ares_request;
grpc_dns_lookup_ares = my_dns_lookup_ares;
@@ -395,6 +396,8 @@ int main(int argc, char** argv) {
cq_verifier_destroy(cqv);
grpc_completion_queue_destroy(cq);
+ grpc_core::SetDNSResolver(g_default_dns_resolver);
+ delete resolver;
grpc_shutdown();
gpr_mu_destroy(&g_mu);
diff --git a/test/core/filters/BUILD b/test/core/filters/BUILD
new file mode 100644
index 00000000000..39765aa0801
--- /dev/null
+++ b/test/core/filters/BUILD
@@ -0,0 +1,32 @@
+# Copyright 2022 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
+
+licenses(["notice"])
+
+grpc_package(name = "test/core/filters")
+
+grpc_cc_test(
+ name = "client_authority_filter_test",
+ srcs = ["client_authority_filter_test.cc"],
+ external_deps = ["gtest"],
+ language = "c++",
+ uses_polling = False,
+ deps = [
+ "//:grpc",
+ "//:grpc_client_authority_filter",
+ "//test/core/util:grpc_suppressions",
+ ],
+)
diff --git a/test/core/filters/client_authority_filter_test.cc b/test/core/filters/client_authority_filter_test.cc
new file mode 100644
index 00000000000..1d2c970da8b
--- /dev/null
+++ b/test/core/filters/client_authority_filter_test.cc
@@ -0,0 +1,119 @@
+// Copyright 2022 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "src/core/ext/filters/http/client_authority_filter.h"
+
+#include
+
+#include "src/core/lib/resource_quota/resource_quota.h"
+
+namespace grpc_core {
+namespace {
+
+auto* g_memory_allocator = new MemoryAllocator(
+ ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
+
+class TestChannelArgs {
+ public:
+ explicit TestChannelArgs(const char* default_authority)
+ : arg_(grpc_channel_arg_string_create(
+ const_cast(GRPC_ARG_DEFAULT_AUTHORITY),
+ const_cast(default_authority))),
+ args_{1, &arg_} {}
+
+ const grpc_channel_args* args() const { return &args_; }
+
+ private:
+ grpc_arg arg_;
+ grpc_channel_args args_;
+};
+
+TEST(ClientAuthorityFilterTest, DefaultFails) {
+ EXPECT_FALSE(ClientAuthorityFilter::Create(nullptr).ok());
+}
+
+TEST(ClientAuthorityFilterTest, WithArgSucceeds) {
+ EXPECT_EQ(ClientAuthorityFilter::Create(
+ TestChannelArgs("foo.test.google.au").args())
+ .status(),
+ absl::OkStatus());
+}
+
+TEST(ClientAuthorityFilterTest, NonStringArgFails) {
+ grpc_arg arg = grpc_channel_arg_integer_create(
+ const_cast(GRPC_ARG_DEFAULT_AUTHORITY), 123);
+ grpc_channel_args args = {1, &arg};
+ EXPECT_FALSE(ClientAuthorityFilter::Create(&args).ok());
+}
+
+TEST(ClientAuthorityFilterTest, PromiseCompletesImmediatelyAndSetsAuthority) {
+ auto filter = *ClientAuthorityFilter::Create(
+ TestChannelArgs("foo.test.google.au").args());
+ auto arena = MakeScopedArena(1024, g_memory_allocator);
+ grpc_metadata_batch initial_metadata_batch(arena.get());
+ grpc_metadata_batch trailing_metadata_batch(arena.get());
+ bool seen = false;
+ // TODO(ctiller): use Activity here, once it's ready.
+ promise_detail::Context context(arena.get());
+ auto promise = filter.MakeCallPromise(
+ ClientInitialMetadata::TestOnlyWrap(&initial_metadata_batch),
+ [&](ClientInitialMetadata initial_metadata) {
+ EXPECT_EQ(initial_metadata->get_pointer(HttpAuthorityMetadata())
+ ->as_string_view(),
+ "foo.test.google.au");
+ seen = true;
+ return ArenaPromise([&]() -> Poll {
+ return TrailingMetadata::TestOnlyWrap(&trailing_metadata_batch);
+ });
+ });
+ auto result = promise();
+ EXPECT_TRUE(absl::get_if(&result) != nullptr);
+ EXPECT_TRUE(seen);
+}
+
+TEST(ClientAuthorityFilterTest,
+ PromiseCompletesImmediatelyAndDoesNotClobberAlreadySetsAuthority) {
+ auto filter = *ClientAuthorityFilter::Create(
+ TestChannelArgs("foo.test.google.au").args());
+ auto arena = MakeScopedArena(1024, g_memory_allocator);
+ grpc_metadata_batch initial_metadata_batch(arena.get());
+ grpc_metadata_batch trailing_metadata_batch(arena.get());
+ initial_metadata_batch.Set(HttpAuthorityMetadata(),
+ Slice::FromStaticString("bar.test.google.au"));
+ bool seen = false;
+ // TODO(ctiller): use Activity here, once it's ready.
+ promise_detail::Context context(arena.get());
+ auto promise = filter.MakeCallPromise(
+ ClientInitialMetadata::TestOnlyWrap(&initial_metadata_batch),
+ [&](ClientInitialMetadata initial_metadata) {
+ EXPECT_EQ(initial_metadata->get_pointer(HttpAuthorityMetadata())
+ ->as_string_view(),
+ "bar.test.google.au");
+ seen = true;
+ return ArenaPromise([&]() -> Poll {
+ return TrailingMetadata::TestOnlyWrap(&trailing_metadata_batch);
+ });
+ });
+ auto result = promise();
+ EXPECT_TRUE(absl::get_if(&result) != nullptr);
+ EXPECT_TRUE(seen);
+}
+
+} // namespace
+} // namespace grpc_core
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 3d489fe6d4f..07e0ca9bd13 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1778,6 +1778,7 @@ src/core/lib/channel/handshaker.h \
src/core/lib/channel/handshaker_factory.h \
src/core/lib/channel/handshaker_registry.cc \
src/core/lib/channel/handshaker_registry.h \
+src/core/lib/channel/promise_based_filter.h \
src/core/lib/channel/status_util.cc \
src/core/lib/channel/status_util.h \
src/core/lib/compression/compression.cc \
@@ -2072,6 +2073,7 @@ src/core/lib/profiling/stap_timers.cc \
src/core/lib/profiling/timers.h \
src/core/lib/promise/activity.cc \
src/core/lib/promise/activity.h \
+src/core/lib/promise/arena_promise.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_seq.h \
src/core/lib/promise/detail/promise_factory.h \
@@ -2082,6 +2084,7 @@ src/core/lib/promise/exec_ctx_wakeup_scheduler.h \
src/core/lib/promise/loop.h \
src/core/lib/promise/map.h \
src/core/lib/promise/poll.h \
+src/core/lib/promise/promise.h \
src/core/lib/promise/race.h \
src/core/lib/promise/seq.h \
src/core/lib/resolver/resolver.cc \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index f3b325c106f..d60c66ce819 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1574,6 +1574,7 @@ src/core/lib/channel/handshaker.h \
src/core/lib/channel/handshaker_factory.h \
src/core/lib/channel/handshaker_registry.cc \
src/core/lib/channel/handshaker_registry.h \
+src/core/lib/channel/promise_based_filter.h \
src/core/lib/channel/status_util.cc \
src/core/lib/channel/status_util.h \
src/core/lib/compression/compression.cc \
@@ -1871,6 +1872,7 @@ src/core/lib/profiling/stap_timers.cc \
src/core/lib/profiling/timers.h \
src/core/lib/promise/activity.cc \
src/core/lib/promise/activity.h \
+src/core/lib/promise/arena_promise.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_seq.h \
src/core/lib/promise/detail/promise_factory.h \
@@ -1881,6 +1883,7 @@ src/core/lib/promise/exec_ctx_wakeup_scheduler.h \
src/core/lib/promise/loop.h \
src/core/lib/promise/map.h \
src/core/lib/promise/poll.h \
+src/core/lib/promise/promise.h \
src/core/lib/promise/race.h \
src/core/lib/promise/seq.h \
src/core/lib/resolver/resolver.cc \
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index d5b6104a6b7..83cb3139f86 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -3615,6 +3615,30 @@
],
"uses_polling": true
},
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
+ "name": "client_authority_filter_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": false
+ },
{
"args": [],
"benchmark": false,