diff --git a/BUILD b/BUILD index fdc7eaa8447..3851f177bab 100644 --- a/BUILD +++ b/BUILD @@ -2000,6 +2000,7 @@ grpc_cc_library( "src/core/lib/transport/http2_errors.h", "src/core/lib/address_utils/parse_address.h", "src/core/lib/backoff/backoff.h", + "src/core/lib/channel/call_finalization.h", "src/core/lib/channel/call_tracer.h", "src/core/lib/channel/channel_stack.h", "src/core/lib/channel/promise_based_filter.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 62d30294174..4eb8bfc8db3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -798,6 +798,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx bitset_test) add_dependencies(buildtests_cxx byte_buffer_test) add_dependencies(buildtests_cxx byte_stream_test) + add_dependencies(buildtests_cxx call_finalization_test) add_dependencies(buildtests_cxx cancel_ares_query_test) add_dependencies(buildtests_cxx capture_test) add_dependencies(buildtests_cxx cel_authorization_engine_test) @@ -8327,6 +8328,41 @@ target_link_libraries(byte_stream_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(call_finalization_test + test/core/channel/call_finalization_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(call_finalization_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(call_finalization_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 07dc9ab2f50..f2ae97e39aa 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -816,6 +816,7 @@ libs: - src/core/lib/address_utils/sockaddr_utils.h - src/core/lib/avl/avl.h - src/core/lib/backoff/backoff.h + - src/core/lib/channel/call_finalization.h - src/core/lib/channel/call_tracer.h - src/core/lib/channel/channel_args.h - src/core/lib/channel/channel_args_preconditioning.h @@ -1986,6 +1987,7 @@ libs: - src/core/lib/address_utils/sockaddr_utils.h - src/core/lib/avl/avl.h - src/core/lib/backoff/backoff.h + - src/core/lib/channel/call_finalization.h - src/core/lib/channel/call_tracer.h - src/core/lib/channel/channel_args.h - src/core/lib/channel/channel_args_preconditioning.h @@ -4569,6 +4571,7 @@ targets: - src/core/lib/slice/slice_refcount.h - src/core/lib/slice/slice_refcount_base.h - src/core/lib/slice/slice_string_helpers.h + - test/core/promise/test_context.h src: - src/core/lib/debug/trace.cc - src/core/lib/event_engine/memory_allocator.cc @@ -4860,6 +4863,16 @@ targets: deps: - grpc_test_util uses_polling: false +- name: call_finalization_test + gtest: true + build: test + language: c++ + headers: + - test/core/promise/test_context.h + src: + - test/core/channel/call_finalization_test.cc + deps: + - grpc_test_util - name: cancel_ares_query_test gtest: true build: test @@ -5127,7 +5140,8 @@ targets: gtest: true build: test language: c++ - headers: [] + headers: + - test/core/promise/test_context.h src: - test/core/filters/client_authority_filter_test.cc deps: diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 35fd3f8286e..84556979b83 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -634,6 +634,7 @@ Pod::Spec.new do |s| 'src/core/lib/address_utils/sockaddr_utils.h', 'src/core/lib/avl/avl.h', 'src/core/lib/backoff/backoff.h', + 'src/core/lib/channel/call_finalization.h', 'src/core/lib/channel/call_tracer.h', 'src/core/lib/channel/channel_args.h', 'src/core/lib/channel/channel_args_preconditioning.h', @@ -1433,6 +1434,7 @@ Pod::Spec.new do |s| 'src/core/lib/address_utils/sockaddr_utils.h', 'src/core/lib/avl/avl.h', 'src/core/lib/backoff/backoff.h', + 'src/core/lib/channel/call_finalization.h', 'src/core/lib/channel/call_tracer.h', 'src/core/lib/channel/channel_args.h', 'src/core/lib/channel/channel_args_preconditioning.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 93b39a97698..238cee95a3d 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -970,6 +970,7 @@ Pod::Spec.new do |s| 'src/core/lib/avl/avl.h', 'src/core/lib/backoff/backoff.cc', 'src/core/lib/backoff/backoff.h', + 'src/core/lib/channel/call_finalization.h', 'src/core/lib/channel/call_tracer.h', 'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_args.h', @@ -2030,6 +2031,7 @@ Pod::Spec.new do |s| 'src/core/lib/address_utils/sockaddr_utils.h', 'src/core/lib/avl/avl.h', 'src/core/lib/backoff/backoff.h', + 'src/core/lib/channel/call_finalization.h', 'src/core/lib/channel/call_tracer.h', 'src/core/lib/channel/channel_args.h', 'src/core/lib/channel/channel_args_preconditioning.h', diff --git a/grpc.gemspec b/grpc.gemspec index 1a84d9eb853..4846fe4ded9 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -889,6 +889,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/avl/avl.h ) s.files += %w( src/core/lib/backoff/backoff.cc ) s.files += %w( src/core/lib/backoff/backoff.h ) + s.files += %w( src/core/lib/channel/call_finalization.h ) s.files += %w( src/core/lib/channel/call_tracer.h ) s.files += %w( src/core/lib/channel/channel_args.cc ) s.files += %w( src/core/lib/channel/channel_args.h ) diff --git a/package.xml b/package.xml index 013b15ebffa..df39b322e27 100644 --- a/package.xml +++ b/package.xml @@ -869,6 +869,7 @@ + diff --git a/src/core/lib/channel/call_finalization.h b/src/core/lib/channel/call_finalization.h new file mode 100644 index 00000000000..503f8c7c5a3 --- /dev/null +++ b/src/core/lib/channel/call_finalization.h @@ -0,0 +1,86 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_CHANNEL_CALL_FINALIZATION_H +#define GRPC_CORE_LIB_CHANNEL_CALL_FINALIZATION_H + +#include + +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/promise/context.h" +#include "src/core/lib/resource_quota/arena.h" + +namespace grpc_core { + +// Call finalization context. +// Sometimes a filter needs to perform some operation after the last byte of +// data is flushed to the wire. This context is used to perform that +// finalization. +// Filters can register a finalizer by calling Add(). +// The finalizer will be called before the call is destroyed but after +// the top level promise is completed. +class CallFinalization { + public: + // Add a step to the finalization context. + // Takes a callable with a signature compatible with: + // (const grpc_call_final_info&) -> void. + // Finalizers are run in the reverse order they are added. + template + void Add(F&& t) { + first_ = + GetContext()->New>(std::forward(t), first_); + } + + void Run(const grpc_call_final_info* final_info) { + if (Finalizer* f = absl::exchange(first_, nullptr)) f->Run(final_info); + } + + private: + // Base class for finalizer implementations. + class Finalizer { + public: + // Run the finalizer and call the destructor of this Finalizer. + virtual void Run(const grpc_call_final_info* final_info) = 0; + + protected: + ~Finalizer() {} + }; + // Specialization for callable objects. + template + class FuncFinalizer final : public Finalizer { + public: + FuncFinalizer(F&& f, Finalizer* next) + : next_(next), f_(std::forward(f)) {} + + void Run(const grpc_call_final_info* final_info) override { + f_(final_info); + Finalizer* next = next_; + this->~FuncFinalizer(); + if (next != nullptr) next->Run(final_info); + } + + private: + Finalizer* next_; + F f_; + }; + // The first finalizer in the chain. + Finalizer* first_ = nullptr; +}; + +template <> +struct ContextType {}; + +} // namespace grpc_core + +#endif // GRPC_CORE_LIB_CHANNEL_CALL_FINALIZATION_H diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index 8088ae16f61..1e18bf70217 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -26,11 +26,13 @@ #include #include +#include "src/core/lib/channel/call_finalization.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/promise/arena_promise.h" +#include "src/core/lib/promise/context.h" #include "src/core/lib/promise/promise.h" #include "src/core/lib/transport/error_utils.h" @@ -95,17 +97,24 @@ class BaseCallData : public Activity, private Wakeable { Waker MakeNonOwningWaker() final; Waker MakeOwningWaker() final; + void Finalize(const grpc_call_final_info* final_info) { + finalization_.Run(final_info); + } + protected: class ScopedContext : public promise_detail::Context, public promise_detail::Context, - public promise_detail::Context { + public promise_detail::Context, + public promise_detail::Context { public: explicit ScopedContext(BaseCallData* call_data) : promise_detail::Context(call_data->arena_), promise_detail::Context( call_data->context_), - promise_detail::Context(call_data->pollent_) {} + promise_detail::Context(call_data->pollent_), + promise_detail::Context(&call_data->finalization_) { + } }; static MetadataHandle WrapMetadata( @@ -135,6 +144,7 @@ class BaseCallData : public Activity, private Wakeable { Arena* const arena_; CallCombiner* const call_combiner_; const Timestamp deadline_; + CallFinalization finalization_; grpc_call_context_element* const context_; grpc_polling_entity* pollent_ = nullptr; }; @@ -380,8 +390,11 @@ MakePromiseBasedFilter(const char* name) { static_cast(elem->call_data)->set_pollent(pollent); }, // destroy_call_elem - [](grpc_call_element* elem, const grpc_call_final_info*, grpc_closure*) { - static_cast(elem->call_data)->~CallData(); + [](grpc_call_element* elem, const grpc_call_final_info* final_info, + grpc_closure*) { + auto* cd = static_cast(elem->call_data); + cd->Finalize(final_info); + cd->~CallData(); }, // sizeof_channel_data sizeof(F), diff --git a/test/core/channel/BUILD b/test/core/channel/BUILD index fba0fe3f377..f91e914693e 100644 --- a/test/core/channel/BUILD +++ b/test/core/channel/BUILD @@ -129,3 +129,18 @@ grpc_cc_test( "//test/core/util:grpc_test_util", ], ) + +grpc_cc_test( + name = "call_finalization_test", + srcs = ["call_finalization_test.cc"], + external_deps = [ + "gtest", + ], + language = "C++", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/promise:test_context", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/channel/call_finalization_test.cc b/test/core/channel/call_finalization_test.cc new file mode 100644 index 00000000000..61d92c5f563 --- /dev/null +++ b/test/core/channel/call_finalization_test.cc @@ -0,0 +1,50 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/channel/call_finalization.h" + +#include + +#include "src/core/lib/resource_quota/resource_quota.h" +#include "test/core/promise/test_context.h" + +namespace grpc_core { + +static auto* g_memory_allocator = new MemoryAllocator( + ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test")); + +TEST(CallFinalizationTest, Works) { + auto arena = MakeScopedArena(1024, g_memory_allocator); + std::string evidence; + TestContext context(arena.get()); + CallFinalization finalization; + auto p = std::make_shared(42); + finalization.Add([&evidence, p](const grpc_call_final_info* final_info) { + evidence += absl::StrCat("FIRST", final_info->error_string, *p, "\n"); + }); + finalization.Add([&evidence, p](const grpc_call_final_info* final_info) { + evidence += absl::StrCat("SECOND", final_info->error_string, *p, "\n"); + }); + grpc_call_final_info final_info{}; + final_info.error_string = "123"; + finalization.Run(&final_info); + EXPECT_EQ(evidence, "SECOND12342\nFIRST12342\n"); +} + +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/filters/BUILD b/test/core/filters/BUILD index 39765aa0801..0d187dbb1ec 100644 --- a/test/core/filters/BUILD +++ b/test/core/filters/BUILD @@ -27,6 +27,7 @@ grpc_cc_test( deps = [ "//:grpc", "//:grpc_client_authority_filter", + "//test/core/promise:test_context", "//test/core/util:grpc_suppressions", ], ) diff --git a/test/core/filters/client_authority_filter_test.cc b/test/core/filters/client_authority_filter_test.cc index 136f5b6f2b4..023c64addf6 100644 --- a/test/core/filters/client_authority_filter_test.cc +++ b/test/core/filters/client_authority_filter_test.cc @@ -17,6 +17,7 @@ #include #include "src/core/lib/resource_quota/resource_quota.h" +#include "test/core/promise/test_context.h" namespace grpc_core { namespace { @@ -68,7 +69,7 @@ TEST(ClientAuthorityFilterTest, PromiseCompletesImmediatelyAndSetsAuthority) { grpc_metadata_batch trailing_metadata_batch(arena.get()); bool seen = false; // TODO(ctiller): use Activity here, once it's ready. - promise_detail::Context context(arena.get()); + TestContext context(arena.get()); auto promise = filter.MakeCallPromise( ClientInitialMetadata::TestOnlyWrap(&initial_metadata_batch), [&](ClientInitialMetadata initial_metadata) { @@ -96,7 +97,7 @@ TEST(ClientAuthorityFilterTest, Slice::FromStaticString("bar.test.google.au")); bool seen = false; // TODO(ctiller): use Activity here, once it's ready. - promise_detail::Context context(arena.get()); + TestContext context(arena.get()); auto promise = filter.MakeCallPromise( ClientInitialMetadata::TestOnlyWrap(&initial_metadata_batch), [&](ClientInitialMetadata initial_metadata) { diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD index 6e6e98f903a..106085b99b4 100644 --- a/test/core/promise/BUILD +++ b/test/core/promise/BUILD @@ -31,6 +31,18 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "test_context", + testonly = True, + hdrs = ["test_context.h"], + external_deps = ["gtest"], + visibility = ["//test/core:__subpackages__"], + deps = [ + "//:gpr_base", + "//test/core/util:grpc_suppressions", + ], +) + grpc_cc_test( name = "poll_test", srcs = ["poll_test.cc"], @@ -74,6 +86,7 @@ grpc_cc_test( language = "c++", uses_polling = False, deps = [ + ":test_context", "//:arena_promise", "//:resource_quota", "//test/core/util:grpc_suppressions", diff --git a/test/core/promise/arena_promise_test.cc b/test/core/promise/arena_promise_test.cc index 20d564da017..e84e82e0353 100644 --- a/test/core/promise/arena_promise_test.cc +++ b/test/core/promise/arena_promise_test.cc @@ -19,6 +19,7 @@ #include #include "src/core/lib/resource_quota/resource_quota.h" +#include "test/core/promise/test_context.h" namespace grpc_core { @@ -27,7 +28,7 @@ static auto* g_memory_allocator = new MemoryAllocator( TEST(ArenaPromiseTest, AllocatedWorks) { auto arena = MakeScopedArena(1024, g_memory_allocator); - promise_detail::Context context(arena.get()); + TestContext context(arena.get()); int x = 42; ArenaPromise p([x] { return Poll(x); }); EXPECT_EQ(p(), Poll(42)); @@ -37,7 +38,7 @@ TEST(ArenaPromiseTest, AllocatedWorks) { TEST(ArenaPromiseTest, DestructionWorks) { auto arena = MakeScopedArena(1024, g_memory_allocator); - promise_detail::Context context(arena.get()); + TestContext context(arena.get()); auto x = std::make_shared(42); auto p = ArenaPromise([x] { return Poll(*x); }); ArenaPromise q(std::move(p)); @@ -46,7 +47,7 @@ TEST(ArenaPromiseTest, DestructionWorks) { TEST(ArenaPromiseTest, MoveAssignmentWorks) { auto arena = MakeScopedArena(1024, g_memory_allocator); - promise_detail::Context context(arena.get()); + TestContext context(arena.get()); auto x = std::make_shared(42); auto p = ArenaPromise([x] { return Poll(*x); }); p = ArenaPromise(); diff --git a/test/core/promise/test_context.h b/test/core/promise/test_context.h new file mode 100644 index 00000000000..669dc91364b --- /dev/null +++ b/test/core/promise/test_context.h @@ -0,0 +1,27 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef TEST_PROMISE_H +#define TEST_PROMISE_H + +#include "src/core/lib/promise/context.h" + +namespace grpc_core { + +template +using TestContext = promise_detail::Context; + +} // namespace grpc_core + +#endif diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 5113ff5a22e..b9796a2a17f 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1868,6 +1868,7 @@ src/core/lib/address_utils/sockaddr_utils.h \ src/core/lib/avl/avl.h \ src/core/lib/backoff/backoff.cc \ src/core/lib/backoff/backoff.h \ +src/core/lib/channel/call_finalization.h \ src/core/lib/channel/call_tracer.h \ src/core/lib/channel/channel_args.cc \ src/core/lib/channel/channel_args.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index f88ad11812e..dd7c1ebcd06 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1660,6 +1660,7 @@ src/core/lib/avl/avl.h \ src/core/lib/backoff/backoff.cc \ src/core/lib/backoff/backoff.h \ src/core/lib/channel/README.md \ +src/core/lib/channel/call_finalization.h \ src/core/lib/channel/call_tracer.h \ src/core/lib/channel/channel_args.cc \ src/core/lib/channel/channel_args.h \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 9482a6c9050..daffc3fac5a 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -3135,6 +3135,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "call_finalization_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,