Call finalization for promises (#29008)

* Call finalization for promises

* comment

* split out and test

* dont use promise_detail:: directly

* fix

* Automated change: Fix sanity tests

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/29016/head
Craig Tiller 3 years ago committed by GitHub
parent a04739095e
commit 047642c5c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 36
      CMakeLists.txt
  3. 16
      build_autogenerated.yaml
  4. 2
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 1
      package.xml
  8. 86
      src/core/lib/channel/call_finalization.h
  9. 21
      src/core/lib/channel/promise_based_filter.h
  10. 15
      test/core/channel/BUILD
  11. 50
      test/core/channel/call_finalization_test.cc
  12. 1
      test/core/filters/BUILD
  13. 5
      test/core/filters/client_authority_filter_test.cc
  14. 13
      test/core/promise/BUILD
  15. 7
      test/core/promise/arena_promise_test.cc
  16. 27
      test/core/promise/test_context.h
  17. 1
      tools/doxygen/Doxyfile.c++.internal
  18. 1
      tools/doxygen/Doxyfile.core.internal
  19. 24
      tools/run_tests/generated/tests.json

@ -2000,6 +2000,7 @@ grpc_cc_library(
"src/core/lib/transport/http2_errors.h",
"src/core/lib/address_utils/parse_address.h",
"src/core/lib/backoff/backoff.h",
"src/core/lib/channel/call_finalization.h",
"src/core/lib/channel/call_tracer.h",
"src/core/lib/channel/channel_stack.h",
"src/core/lib/channel/promise_based_filter.h",

36
CMakeLists.txt generated

@ -798,6 +798,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx bitset_test)
add_dependencies(buildtests_cxx byte_buffer_test)
add_dependencies(buildtests_cxx byte_stream_test)
add_dependencies(buildtests_cxx call_finalization_test)
add_dependencies(buildtests_cxx cancel_ares_query_test)
add_dependencies(buildtests_cxx capture_test)
add_dependencies(buildtests_cxx cel_authorization_engine_test)
@ -8327,6 +8328,41 @@ target_link_libraries(byte_stream_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(call_finalization_test
test/core/channel/call_finalization_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(call_finalization_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(call_finalization_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)

@ -816,6 +816,7 @@ libs:
- src/core/lib/address_utils/sockaddr_utils.h
- src/core/lib/avl/avl.h
- src/core/lib/backoff/backoff.h
- src/core/lib/channel/call_finalization.h
- src/core/lib/channel/call_tracer.h
- src/core/lib/channel/channel_args.h
- src/core/lib/channel/channel_args_preconditioning.h
@ -1986,6 +1987,7 @@ libs:
- src/core/lib/address_utils/sockaddr_utils.h
- src/core/lib/avl/avl.h
- src/core/lib/backoff/backoff.h
- src/core/lib/channel/call_finalization.h
- src/core/lib/channel/call_tracer.h
- src/core/lib/channel/channel_args.h
- src/core/lib/channel/channel_args_preconditioning.h
@ -4569,6 +4571,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
- test/core/promise/test_context.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
@ -4860,6 +4863,16 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: call_finalization_test
gtest: true
build: test
language: c++
headers:
- test/core/promise/test_context.h
src:
- test/core/channel/call_finalization_test.cc
deps:
- grpc_test_util
- name: cancel_ares_query_test
gtest: true
build: test
@ -5127,7 +5140,8 @@ targets:
gtest: true
build: test
language: c++
headers: []
headers:
- test/core/promise/test_context.h
src:
- test/core/filters/client_authority_filter_test.cc
deps:

2
gRPC-C++.podspec generated

@ -634,6 +634,7 @@ Pod::Spec.new do |s|
'src/core/lib/address_utils/sockaddr_utils.h',
'src/core/lib/avl/avl.h',
'src/core/lib/backoff/backoff.h',
'src/core/lib/channel/call_finalization.h',
'src/core/lib/channel/call_tracer.h',
'src/core/lib/channel/channel_args.h',
'src/core/lib/channel/channel_args_preconditioning.h',
@ -1433,6 +1434,7 @@ Pod::Spec.new do |s|
'src/core/lib/address_utils/sockaddr_utils.h',
'src/core/lib/avl/avl.h',
'src/core/lib/backoff/backoff.h',
'src/core/lib/channel/call_finalization.h',
'src/core/lib/channel/call_tracer.h',
'src/core/lib/channel/channel_args.h',
'src/core/lib/channel/channel_args_preconditioning.h',

2
gRPC-Core.podspec generated

@ -970,6 +970,7 @@ Pod::Spec.new do |s|
'src/core/lib/avl/avl.h',
'src/core/lib/backoff/backoff.cc',
'src/core/lib/backoff/backoff.h',
'src/core/lib/channel/call_finalization.h',
'src/core/lib/channel/call_tracer.h',
'src/core/lib/channel/channel_args.cc',
'src/core/lib/channel/channel_args.h',
@ -2030,6 +2031,7 @@ Pod::Spec.new do |s|
'src/core/lib/address_utils/sockaddr_utils.h',
'src/core/lib/avl/avl.h',
'src/core/lib/backoff/backoff.h',
'src/core/lib/channel/call_finalization.h',
'src/core/lib/channel/call_tracer.h',
'src/core/lib/channel/channel_args.h',
'src/core/lib/channel/channel_args_preconditioning.h',

1
grpc.gemspec generated

@ -889,6 +889,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/avl/avl.h )
s.files += %w( src/core/lib/backoff/backoff.cc )
s.files += %w( src/core/lib/backoff/backoff.h )
s.files += %w( src/core/lib/channel/call_finalization.h )
s.files += %w( src/core/lib/channel/call_tracer.h )
s.files += %w( src/core/lib/channel/channel_args.cc )
s.files += %w( src/core/lib/channel/channel_args.h )

1
package.xml generated

@ -869,6 +869,7 @@
<file baseinstalldir="/" name="src/core/lib/avl/avl.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/backoff/backoff.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/backoff/backoff.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/call_finalization.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/call_tracer.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channel_args.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/channel_args.h" role="src" />

@ -0,0 +1,86 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_CHANNEL_CALL_FINALIZATION_H
#define GRPC_CORE_LIB_CHANNEL_CALL_FINALIZATION_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc_core {
// Call finalization context.
// Sometimes a filter needs to perform some operation after the last byte of
// data is flushed to the wire. This context is used to perform that
// finalization.
// Filters can register a finalizer by calling Add().
// The finalizer will be called before the call is destroyed but after
// the top level promise is completed.
class CallFinalization {
public:
// Add a step to the finalization context.
// Takes a callable with a signature compatible with:
// (const grpc_call_final_info&) -> void.
// Finalizers are run in the reverse order they are added.
template <typename F>
void Add(F&& t) {
first_ =
GetContext<Arena>()->New<FuncFinalizer<F>>(std::forward<F>(t), first_);
}
void Run(const grpc_call_final_info* final_info) {
if (Finalizer* f = absl::exchange(first_, nullptr)) f->Run(final_info);
}
private:
// Base class for finalizer implementations.
class Finalizer {
public:
// Run the finalizer and call the destructor of this Finalizer.
virtual void Run(const grpc_call_final_info* final_info) = 0;
protected:
~Finalizer() {}
};
// Specialization for callable objects.
template <typename F>
class FuncFinalizer final : public Finalizer {
public:
FuncFinalizer(F&& f, Finalizer* next)
: next_(next), f_(std::forward<F>(f)) {}
void Run(const grpc_call_final_info* final_info) override {
f_(final_info);
Finalizer* next = next_;
this->~FuncFinalizer();
if (next != nullptr) next->Run(final_info);
}
private:
Finalizer* next_;
F f_;
};
// The first finalizer in the chain.
Finalizer* first_ = nullptr;
};
template <>
struct ContextType<CallFinalization> {};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_CHANNEL_CALL_FINALIZATION_H

@ -26,11 +26,13 @@
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/transport/error_utils.h"
@ -95,17 +97,24 @@ class BaseCallData : public Activity, private Wakeable {
Waker MakeNonOwningWaker() final;
Waker MakeOwningWaker() final;
void Finalize(const grpc_call_final_info* final_info) {
finalization_.Run(final_info);
}
protected:
class ScopedContext
: public promise_detail::Context<Arena>,
public promise_detail::Context<grpc_call_context_element>,
public promise_detail::Context<grpc_polling_entity> {
public promise_detail::Context<grpc_polling_entity>,
public promise_detail::Context<CallFinalization> {
public:
explicit ScopedContext(BaseCallData* call_data)
: promise_detail::Context<Arena>(call_data->arena_),
promise_detail::Context<grpc_call_context_element>(
call_data->context_),
promise_detail::Context<grpc_polling_entity>(call_data->pollent_) {}
promise_detail::Context<grpc_polling_entity>(call_data->pollent_),
promise_detail::Context<CallFinalization>(&call_data->finalization_) {
}
};
static MetadataHandle<grpc_metadata_batch> WrapMetadata(
@ -135,6 +144,7 @@ class BaseCallData : public Activity, private Wakeable {
Arena* const arena_;
CallCombiner* const call_combiner_;
const Timestamp deadline_;
CallFinalization finalization_;
grpc_call_context_element* const context_;
grpc_polling_entity* pollent_ = nullptr;
};
@ -380,8 +390,11 @@ MakePromiseBasedFilter(const char* name) {
static_cast<CallData*>(elem->call_data)->set_pollent(pollent);
},
// destroy_call_elem
[](grpc_call_element* elem, const grpc_call_final_info*, grpc_closure*) {
static_cast<CallData*>(elem->call_data)->~CallData();
[](grpc_call_element* elem, const grpc_call_final_info* final_info,
grpc_closure*) {
auto* cd = static_cast<CallData*>(elem->call_data);
cd->Finalize(final_info);
cd->~CallData();
},
// sizeof_channel_data
sizeof(F),

@ -129,3 +129,18 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "call_finalization_test",
srcs = ["call_finalization_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//test/core/promise:test_context",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,50 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/channel/call_finalization.h"
#include <gtest/gtest.h>
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_context.h"
namespace grpc_core {
static auto* g_memory_allocator = new MemoryAllocator(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
TEST(CallFinalizationTest, Works) {
auto arena = MakeScopedArena(1024, g_memory_allocator);
std::string evidence;
TestContext<Arena> context(arena.get());
CallFinalization finalization;
auto p = std::make_shared<int>(42);
finalization.Add([&evidence, p](const grpc_call_final_info* final_info) {
evidence += absl::StrCat("FIRST", final_info->error_string, *p, "\n");
});
finalization.Add([&evidence, p](const grpc_call_final_info* final_info) {
evidence += absl::StrCat("SECOND", final_info->error_string, *p, "\n");
});
grpc_call_final_info final_info{};
final_info.error_string = "123";
finalization.Run(&final_info);
EXPECT_EQ(evidence, "SECOND12342\nFIRST12342\n");
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -27,6 +27,7 @@ grpc_cc_test(
deps = [
"//:grpc",
"//:grpc_client_authority_filter",
"//test/core/promise:test_context",
"//test/core/util:grpc_suppressions",
],
)

@ -17,6 +17,7 @@
#include <gtest/gtest.h>
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_context.h"
namespace grpc_core {
namespace {
@ -68,7 +69,7 @@ TEST(ClientAuthorityFilterTest, PromiseCompletesImmediatelyAndSetsAuthority) {
grpc_metadata_batch trailing_metadata_batch(arena.get());
bool seen = false;
// TODO(ctiller): use Activity here, once it's ready.
promise_detail::Context<Arena> context(arena.get());
TestContext<Arena> context(arena.get());
auto promise = filter.MakeCallPromise(
ClientInitialMetadata::TestOnlyWrap(&initial_metadata_batch),
[&](ClientInitialMetadata initial_metadata) {
@ -96,7 +97,7 @@ TEST(ClientAuthorityFilterTest,
Slice::FromStaticString("bar.test.google.au"));
bool seen = false;
// TODO(ctiller): use Activity here, once it's ready.
promise_detail::Context<Arena> context(arena.get());
TestContext<Arena> context(arena.get());
auto promise = filter.MakeCallPromise(
ClientInitialMetadata::TestOnlyWrap(&initial_metadata_batch),
[&](ClientInitialMetadata initial_metadata) {

@ -31,6 +31,18 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "test_context",
testonly = True,
hdrs = ["test_context.h"],
external_deps = ["gtest"],
visibility = ["//test/core:__subpackages__"],
deps = [
"//:gpr_base",
"//test/core/util:grpc_suppressions",
],
)
grpc_cc_test(
name = "poll_test",
srcs = ["poll_test.cc"],
@ -74,6 +86,7 @@ grpc_cc_test(
language = "c++",
uses_polling = False,
deps = [
":test_context",
"//:arena_promise",
"//:resource_quota",
"//test/core/util:grpc_suppressions",

@ -19,6 +19,7 @@
#include <gtest/gtest.h>
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_context.h"
namespace grpc_core {
@ -27,7 +28,7 @@ static auto* g_memory_allocator = new MemoryAllocator(
TEST(ArenaPromiseTest, AllocatedWorks) {
auto arena = MakeScopedArena(1024, g_memory_allocator);
promise_detail::Context<Arena> context(arena.get());
TestContext<Arena> context(arena.get());
int x = 42;
ArenaPromise<int> p([x] { return Poll<int>(x); });
EXPECT_EQ(p(), Poll<int>(42));
@ -37,7 +38,7 @@ TEST(ArenaPromiseTest, AllocatedWorks) {
TEST(ArenaPromiseTest, DestructionWorks) {
auto arena = MakeScopedArena(1024, g_memory_allocator);
promise_detail::Context<Arena> context(arena.get());
TestContext<Arena> context(arena.get());
auto x = std::make_shared<int>(42);
auto p = ArenaPromise<int>([x] { return Poll<int>(*x); });
ArenaPromise<int> q(std::move(p));
@ -46,7 +47,7 @@ TEST(ArenaPromiseTest, DestructionWorks) {
TEST(ArenaPromiseTest, MoveAssignmentWorks) {
auto arena = MakeScopedArena(1024, g_memory_allocator);
promise_detail::Context<Arena> context(arena.get());
TestContext<Arena> context(arena.get());
auto x = std::make_shared<int>(42);
auto p = ArenaPromise<int>([x] { return Poll<int>(*x); });
p = ArenaPromise<int>();

@ -0,0 +1,27 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef TEST_PROMISE_H
#define TEST_PROMISE_H
#include "src/core/lib/promise/context.h"
namespace grpc_core {
template <class T>
using TestContext = promise_detail::Context<T>;
} // namespace grpc_core
#endif

@ -1868,6 +1868,7 @@ src/core/lib/address_utils/sockaddr_utils.h \
src/core/lib/avl/avl.h \
src/core/lib/backoff/backoff.cc \
src/core/lib/backoff/backoff.h \
src/core/lib/channel/call_finalization.h \
src/core/lib/channel/call_tracer.h \
src/core/lib/channel/channel_args.cc \
src/core/lib/channel/channel_args.h \

@ -1660,6 +1660,7 @@ src/core/lib/avl/avl.h \
src/core/lib/backoff/backoff.cc \
src/core/lib/backoff/backoff.h \
src/core/lib/channel/README.md \
src/core/lib/channel/call_finalization.h \
src/core/lib/channel/call_tracer.h \
src/core/lib/channel/channel_args.cc \
src/core/lib/channel/channel_args.h \

@ -3135,6 +3135,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "call_finalization_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save