mirror of https://github.com/grpc/grpc.git
* Revert "[promises] Server call (#31448)"
This reverts commit bbeb15006a
.
* clang-fmt
pull/32267/head
parent
9378a7d262
commit
dbfb888e1f
85 changed files with 1742 additions and 4987 deletions
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -1,27 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/closure.h" |
||||
|
||||
#include "absl/strings/str_format.h" |
||||
|
||||
std::string grpc_closure::DebugString() const { |
||||
#ifdef NDEBUG |
||||
return absl::StrFormat("%p", this); |
||||
#else |
||||
return absl::StrFormat("%p|created=%s:%d", this, file_created, line_created); |
||||
#endif |
||||
} |
@ -1,309 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H |
||||
#define GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <algorithm> |
||||
#include <new> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/types/optional.h" |
||||
#include "absl/types/variant.h" |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/construct_destruct.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/promise/detail/promise_factory.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/promise/trace.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Tracks a list of maps of T -> optional<T> via promises.
|
||||
// When Run, runs each transformation in order, and resolves to the ulimate
|
||||
// result.
|
||||
// If a map resolves to nullopt, the chain is terminated and the result is
|
||||
// nullopt.
|
||||
// Maps can also have synchronous cleanup functions, which are guaranteed to be
|
||||
// called at the termination of each run through the chain.
|
||||
template <typename T> |
||||
class InterceptorList { |
||||
private: |
||||
// A map of T -> T via promises.
|
||||
class Map { |
||||
public: |
||||
explicit Map(DebugLocation from) : from_(from) {} |
||||
// Construct a promise to transform x into some other value at memory.
|
||||
virtual void MakePromise(T x, void* memory) = 0; |
||||
// Destroy a promise constructed at memory.
|
||||
virtual void Destroy(void* memory) = 0; |
||||
// Poll a promise constructed at memory.
|
||||
// Resolves to an optional<T> -- if nullopt it means terminate the chain and
|
||||
// resolve.
|
||||
virtual Poll<absl::optional<T>> PollOnce(void* memory) = 0; |
||||
virtual ~Map() = default; |
||||
|
||||
// Update the next pointer stored with this map.
|
||||
// This is only valid to call once, and only before the map is used.
|
||||
void SetNext(Map* next) { |
||||
GPR_DEBUG_ASSERT(next_ == nullptr); |
||||
next_ = next; |
||||
} |
||||
|
||||
// Access the creation location for this map (for debug tracing).
|
||||
DebugLocation from() const { return from_; } |
||||
|
||||
// Access the next map in the chain (or nullptr if this is the last map).
|
||||
Map* next() const { return next_; } |
||||
|
||||
private: |
||||
GPR_NO_UNIQUE_ADDRESS const DebugLocation from_; |
||||
Map* next_ = nullptr; |
||||
}; |
||||
|
||||
public: |
||||
// The result of Run: a promise that will execute the entire chain.
|
||||
class RunPromise { |
||||
public: |
||||
RunPromise(size_t memory_required, Map* factory, absl::optional<T> value) { |
||||
if (!value.has_value() || factory == nullptr) { |
||||
is_immediately_resolved_ = true; |
||||
Construct(&result_, std::move(value)); |
||||
} else { |
||||
is_immediately_resolved_ = false; |
||||
Construct(&async_resolution_, memory_required); |
||||
factory->MakePromise(std::move(*value), async_resolution_.space.get()); |
||||
async_resolution_.current_factory = factory; |
||||
} |
||||
} |
||||
|
||||
~RunPromise() { |
||||
if (is_immediately_resolved_) { |
||||
Destruct(&result_); |
||||
} else { |
||||
if (async_resolution_.current_factory != nullptr) { |
||||
async_resolution_.current_factory->Destroy( |
||||
async_resolution_.space.get()); |
||||
} |
||||
Destruct(&async_resolution_); |
||||
} |
||||
} |
||||
|
||||
RunPromise(const RunPromise&) = delete; |
||||
RunPromise& operator=(const RunPromise&) = delete; |
||||
|
||||
RunPromise(RunPromise&& other) noexcept |
||||
: is_immediately_resolved_(other.is_immediately_resolved_) { |
||||
if (is_immediately_resolved_) { |
||||
Construct(&result_, std::move(other.result_)); |
||||
} else { |
||||
Construct(&async_resolution_, std::move(other.async_resolution_)); |
||||
} |
||||
} |
||||
|
||||
RunPromise& operator=(RunPromise&& other) noexcept = delete; |
||||
|
||||
Poll<absl::optional<T>> operator()() { |
||||
if (grpc_trace_promise_primitives.enabled()) { |
||||
gpr_log(GPR_DEBUG, "InterceptorList::RunPromise: %s", |
||||
DebugString().c_str()); |
||||
} |
||||
if (is_immediately_resolved_) return std::move(result_); |
||||
while (true) { |
||||
auto r = async_resolution_.current_factory->PollOnce( |
||||
async_resolution_.space.get()); |
||||
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) { |
||||
async_resolution_.current_factory->Destroy( |
||||
async_resolution_.space.get()); |
||||
async_resolution_.current_factory = |
||||
async_resolution_.current_factory->next(); |
||||
if (async_resolution_.current_factory == nullptr || !p->has_value()) { |
||||
return std::move(*p); |
||||
} |
||||
async_resolution_.current_factory->MakePromise( |
||||
std::move(**p), async_resolution_.space.get()); |
||||
continue; |
||||
} |
||||
return Pending{}; |
||||
} |
||||
} |
||||
|
||||
private: |
||||
std::string DebugString() const { |
||||
if (is_immediately_resolved_) { |
||||
return absl::StrFormat("Result:has_value:%d", result_.has_value()); |
||||
} else { |
||||
return absl::StrCat( |
||||
"Running:", |
||||
async_resolution_.current_factory == nullptr |
||||
? "END" |
||||
: ([p = async_resolution_.current_factory->from()]() { |
||||
return absl::StrCat(p.file(), ":", p.line()); |
||||
})() |
||||
.c_str()); |
||||
} |
||||
} |
||||
struct AsyncResolution { |
||||
explicit AsyncResolution(size_t max_size) |
||||
: space(GetContext<Arena>()->MakePooledArray<char>(max_size)) {} |
||||
AsyncResolution(const AsyncResolution&) = delete; |
||||
AsyncResolution& operator=(const AsyncResolution&) = delete; |
||||
AsyncResolution(AsyncResolution&& other) noexcept |
||||
: current_factory(std::exchange(other.current_factory, nullptr)), |
||||
space(std::move(other.space)) {} |
||||
Map* current_factory; |
||||
Arena::PoolPtr<char[]> space; |
||||
}; |
||||
union { |
||||
AsyncResolution async_resolution_; |
||||
absl::optional<T> result_; |
||||
}; |
||||
// If true, the result_ union is valid, otherwise async_resolution_ is.
|
||||
// Indicates whether the promise resolved immediately at construction or if
|
||||
// additional steps were needed.
|
||||
bool is_immediately_resolved_; |
||||
}; |
||||
|
||||
InterceptorList() = default; |
||||
InterceptorList(const InterceptorList&) = delete; |
||||
InterceptorList& operator=(const InterceptorList&) = delete; |
||||
~InterceptorList() { DeleteFactories(); } |
||||
|
||||
RunPromise Run(absl::optional<T> initial_value) { |
||||
return RunPromise(promise_memory_required_, first_map_, |
||||
std::move(initial_value)); |
||||
} |
||||
|
||||
// Append a new map to the end of the chain.
|
||||
template <typename Fn> |
||||
void AppendMap(Fn fn, DebugLocation from) { |
||||
Append(MakeMapToAdd( |
||||
std::move(fn), [] {}, from)); |
||||
} |
||||
|
||||
// Prepend a new map to the beginning of the chain.
|
||||
template <typename Fn> |
||||
void PrependMap(Fn fn, DebugLocation from) { |
||||
Prepend(MakeMapToAdd( |
||||
std::move(fn), [] {}, from)); |
||||
} |
||||
|
||||
// Append a new map to the end of the chain, with a cleanup function to be
|
||||
// called at the end of run promise execution.
|
||||
template <typename Fn, typename CleanupFn> |
||||
void AppendMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { |
||||
Append(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from)); |
||||
} |
||||
|
||||
// Prepend a new map to the beginning of the chain, with a cleanup function to
|
||||
// be called at the end of run promise execution.
|
||||
template <typename Fn, typename CleanupFn> |
||||
void PrependMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { |
||||
Prepend(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from)); |
||||
} |
||||
|
||||
protected: |
||||
// Clear the interceptor list
|
||||
void ResetInterceptorList() { |
||||
DeleteFactories(); |
||||
first_map_ = nullptr; |
||||
last_map_ = nullptr; |
||||
promise_memory_required_ = 0; |
||||
} |
||||
|
||||
private: |
||||
template <typename Fn, typename CleanupFn> |
||||
class MapImpl final : public Map { |
||||
public: |
||||
using PromiseFactory = promise_detail::RepeatedPromiseFactory<T, Fn>; |
||||
using Promise = typename PromiseFactory::Promise; |
||||
|
||||
explicit MapImpl(Fn fn, CleanupFn cleanup_fn, DebugLocation from) |
||||
: Map(from), fn_(std::move(fn)), cleanup_fn_(std::move(cleanup_fn)) {} |
||||
~MapImpl() override { cleanup_fn_(); } |
||||
void MakePromise(T x, void* memory) override { |
||||
new (memory) Promise(fn_.Make(std::move(x))); |
||||
} |
||||
void Destroy(void* memory) override { |
||||
static_cast<Promise*>(memory)->~Promise(); |
||||
} |
||||
Poll<absl::optional<T>> PollOnce(void* memory) override { |
||||
return poll_cast<absl::optional<T>>((*static_cast<Promise*>(memory))()); |
||||
} |
||||
|
||||
private: |
||||
GPR_NO_UNIQUE_ADDRESS PromiseFactory fn_; |
||||
GPR_NO_UNIQUE_ADDRESS CleanupFn cleanup_fn_; |
||||
}; |
||||
|
||||
template <typename Fn, typename CleanupFn> |
||||
Map* MakeMapToAdd(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { |
||||
using FactoryType = MapImpl<Fn, CleanupFn>; |
||||
promise_memory_required_ = std::max(promise_memory_required_, |
||||
sizeof(typename FactoryType::Promise)); |
||||
return GetContext<Arena>()->New<FactoryType>(std::move(fn), |
||||
std::move(cleanup_fn), from); |
||||
} |
||||
|
||||
void Append(Map* f) { |
||||
if (first_map_ == nullptr) { |
||||
first_map_ = f; |
||||
last_map_ = f; |
||||
} else { |
||||
last_map_->SetNext(f); |
||||
last_map_ = f; |
||||
} |
||||
} |
||||
|
||||
void Prepend(Map* f) { |
||||
if (first_map_ == nullptr) { |
||||
first_map_ = f; |
||||
last_map_ = f; |
||||
} else { |
||||
f->SetNext(first_map_); |
||||
first_map_ = f; |
||||
} |
||||
} |
||||
|
||||
void DeleteFactories() { |
||||
for (auto* f = first_map_; f != nullptr;) { |
||||
auto* next = f->next(); |
||||
f->~Map(); |
||||
f = next; |
||||
} |
||||
} |
||||
|
||||
// The first map in the chain.
|
||||
Map* first_map_ = nullptr; |
||||
// The last map in the chain.
|
||||
Map* last_map_ = nullptr; |
||||
// The amount of memory required to store the largest promise in the chain.
|
||||
size_t promise_memory_required_ = 0; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H
|
@ -1,24 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_PROMISE_TRACE_H |
||||
#define GRPC_SRC_CORE_LIB_PROMISE_TRACE_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
|
||||
extern grpc_core::DebugOnlyTraceFlag grpc_trace_promise_primitives; |
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_PROMISE_TRACE_H
|
@ -1,20 +0,0 @@ |
||||
// Copyright 2016 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/slice/slice_refcount.h" |
||||
|
||||
grpc_core::DebugOnlyTraceFlag grpc_slice_refcount_trace(false, |
||||
"slice_refcount"); |
File diff suppressed because it is too large
Load Diff
@ -1,144 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/promise/interceptor_list.h" |
||||
|
||||
#include <initializer_list> |
||||
#include <memory> |
||||
|
||||
#include "gtest/gtest.h" |
||||
|
||||
#include <grpc/event_engine/memory_allocator.h> |
||||
|
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/resource_quota/memory_quota.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "test/core/promise/test_context.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace { |
||||
|
||||
class InterceptorListTest : public ::testing::Test { |
||||
protected: |
||||
MemoryAllocator memory_allocator_ = MemoryAllocator( |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test")); |
||||
ScopedArenaPtr arena_ = MakeScopedArena(1024, &memory_allocator_); |
||||
TestContext<Arena> arena_ctx_{arena_.get()}; |
||||
}; |
||||
|
||||
TEST_F(InterceptorListTest, NoOp) { InterceptorList<std::string>(); } |
||||
|
||||
TEST_F(InterceptorListTest, CanRunOne) { |
||||
InterceptorList<std::string> list; |
||||
list.AppendMap([](std::string s) { return s + "a"; }, DEBUG_LOCATION); |
||||
EXPECT_EQ(list.Run("hello")(), Poll<absl::optional<std::string>>("helloa")); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunTwo) { |
||||
InterceptorList<std::string> list; |
||||
list.AppendMap([](std::string s) { return s + "a"; }, DEBUG_LOCATION); |
||||
list.AppendMap([](std::string s) { return s + "b"; }, DEBUG_LOCATION); |
||||
EXPECT_EQ(list.Run("hello")(), Poll<absl::optional<std::string>>("helloab")); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunTwoTwice) { |
||||
InterceptorList<std::string> list; |
||||
list.AppendMap([](std::string s) { return s + s; }, DEBUG_LOCATION); |
||||
list.AppendMap([](std::string s) { return s + s + s; }, DEBUG_LOCATION); |
||||
EXPECT_EQ(absl::get<kPollReadyIdx>(list.Run(std::string(10, 'a'))()).value(), |
||||
std::string(60, 'a')); |
||||
EXPECT_EQ(absl::get<kPollReadyIdx>(list.Run(std::string(100, 'b'))()).value(), |
||||
std::string(600, 'b')); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunManyWithCaptures) { |
||||
InterceptorList<std::string> list; |
||||
for (size_t i = 0; i < 26 * 1000; i++) { |
||||
list.AppendMap( |
||||
[i = std::make_shared<size_t>(i)](std::string s) { |
||||
return s + static_cast<char>((*i % 26) + 'a'); |
||||
}, |
||||
DEBUG_LOCATION); |
||||
} |
||||
std::string expected; |
||||
for (size_t i = 0; i < 1000; i++) { |
||||
expected += "abcdefghijklmnopqrstuvwxyz"; |
||||
} |
||||
EXPECT_EQ(absl::get<kPollReadyIdx>(list.Run("")()).value(), expected); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunOnePrepended) { |
||||
InterceptorList<std::string> list; |
||||
list.PrependMap([](std::string s) { return s + "a"; }, DEBUG_LOCATION); |
||||
EXPECT_EQ(list.Run("hello")(), Poll<absl::optional<std::string>>("helloa")); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunTwoPrepended) { |
||||
InterceptorList<std::string> list; |
||||
list.PrependMap([](std::string s) { return s + "a"; }, DEBUG_LOCATION); |
||||
list.PrependMap([](std::string s) { return s + "b"; }, DEBUG_LOCATION); |
||||
EXPECT_EQ(list.Run("hello")(), Poll<absl::optional<std::string>>("helloba")); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunManyWithCapturesPrepended) { |
||||
InterceptorList<std::string> list; |
||||
for (size_t i = 0; i < 26 * 1000; i++) { |
||||
list.PrependMap( |
||||
[i = std::make_shared<size_t>(i)](std::string s) { |
||||
return s + static_cast<char>((*i % 26) + 'a'); |
||||
}, |
||||
DEBUG_LOCATION); |
||||
} |
||||
std::string expected; |
||||
for (size_t i = 0; i < 1000; i++) { |
||||
expected += "zyxwvutsrqponmlkjihgfedcba"; |
||||
} |
||||
EXPECT_EQ(absl::get<kPollReadyIdx>(list.Run("")()).value(), expected); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunManyWithCapturesThatDelay) { |
||||
InterceptorList<std::string> list; |
||||
for (size_t i = 0; i < 26 * 1000; i++) { |
||||
list.AppendMap( |
||||
[i = std::make_shared<size_t>(i)](std::string s) { |
||||
return |
||||
[x = false, i, s]() mutable -> Poll<absl::optional<std::string>> { |
||||
if (!x) { |
||||
x = true; |
||||
return Pending{}; |
||||
} |
||||
return s + static_cast<char>((*i % 26) + 'a'); |
||||
}; |
||||
}, |
||||
DEBUG_LOCATION); |
||||
} |
||||
auto promise = list.Run(""); |
||||
for (size_t i = 0; i < 26 * 1000; i++) { |
||||
EXPECT_TRUE(absl::holds_alternative<Pending>(promise())) << i; |
||||
} |
||||
std::string expected; |
||||
for (size_t i = 0; i < 1000; i++) { |
||||
expected += "abcdefghijklmnopqrstuvwxyz"; |
||||
} |
||||
EXPECT_EQ(absl::get<kPollReadyIdx>(promise()).value(), expected); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue