mirror of https://github.com/grpc/grpc.git
[promises] Server call (#31448)
* add experiment * allow instantiation * scratchings * scratchings * sniffly * Automated change: Fix sanity tests * fix * fix * fix * Automated change: Fix sanity tests * progress * change pipe labels to enable server code to be written * better api * Automated change: Fix sanity tests * progress * [promises] Implementation of deadline for server-based-calls * compression filter compiles again * Automated change: Fix sanity tests * fix * server tracing fixes * get client initial metadata * progress * progress * server call surface progress * Automated change: Fix sanity tests * move payload * server-progress * recv-message-server-connchan * logging * fix context-gate * recv fix@top * Automated change: Fix sanity tests * recv close on server * top termination start * [promises] Move Empty to be first class * fixes * fix * flow control fix * got to orphan! * orphan * call orphan * spam cleanup * fix * new cancelation semantics * progress * large metadata fixes * fix * fix * log * better logs * fix-chanz * logging, necessaryness * fix typo * fixes * fix * fix * fix-pipe * cleanup logging * fix * build-fix * fix * Automated change: Fix sanity tests * logging * Automated change: Fix sanity tests * Automated change: Fix sanity tests * better primitive * Revert "better primitive" This reverts commitpull/32094/head^2119b5ee244
. * fix * fix * trrracing * Automated change: Fix sanity tests * get-trailing-metadata * cancellation * Automated change: Fix sanity tests * add transform pipeline to pipe * add transform pipeline to pipe * interceptor lists * new server initial md api into filters * convert connected_channel * convert call * initial promise based filter conversion * convert promise based filter * build fixes * compile fix * fixes * fix ordering * fixes * check-metadata * revert later: debug code * better debug * fix metadata ordering with messages in promise based filter * fix ordering problem between batch completion and promise completion * properly handle failure on receive message path on client * more debug, fix a repoll bug in pbf * Automated change: Fix sanity tests * fixes * Automated change: Fix sanity tests * cleanup logging * fixes * missing file * fixes * logging * Automated change: Fix sanity tests * fixes * convert logging filter * fix * Automated change: Fix sanity tests * fix bad server response test * Revert "Disable logging test (#32049)" This reverts commit5fc92eaeae
. * fix * Automated change: Fix sanity tests * fix memory leaks, logging * Automated change: Fix sanity tests * slice refcount debugging * asan-canaries * leak-fix * leak-fix * Automated change: Fix sanity tests * fix * fix * fix * fix * fix * Automated change: Fix sanity tests * fix * remove mistaken line * add-comment * fix refcounting bug * Automated change: Fix sanity tests * rename variable * renames * bleh * carry pipe close status from bottom of pipe to top to appease recv-close-on-server * backport cancellation * Revert "carry pipe close status from bottom of pipe to top to appease" This reverts commitfa33301dcd
. * fix * Automated change: Fix sanity tests * review-feedback * comment-ordering * monostate * renames * undo-review-feedback * fix * review-feedback * review-feedback * fix * review-feedback * drop debugloc constructor * interceptor-list-rev-feedback * interceptor-list-rev-feedback * pipe test * review-feedback * undo-mistaken-change * Automated change: Fix sanity tests * pipe error state * detect send/recv failures and report * iwyu, build * fix submodules * fix * warning * cleanup * Automated change: Fix sanity tests * fix * fix for windows * fix * null pointer fix * iwyu * gen projex --------- Co-authored-by: ctiller <ctiller@users.noreply.github.com>
parent
03ea0bb28d
commit
bbeb15006a
85 changed files with 4960 additions and 1744 deletions
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,27 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/closure.h" |
||||
|
||||
#include "absl/strings/str_format.h" |
||||
|
||||
std::string grpc_closure::DebugString() const { |
||||
#ifdef NDEBUG |
||||
return absl::StrFormat("%p", this); |
||||
#else |
||||
return absl::StrFormat("%p|created=%s:%d", this, file_created, line_created); |
||||
#endif |
||||
} |
@ -0,0 +1,309 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H |
||||
#define GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <algorithm> |
||||
#include <new> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/types/optional.h" |
||||
#include "absl/types/variant.h" |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/construct_destruct.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/promise/detail/promise_factory.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/promise/trace.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Tracks a list of maps of T -> optional<T> via promises.
|
||||
// When Run, runs each transformation in order, and resolves to the ulimate
|
||||
// result.
|
||||
// If a map resolves to nullopt, the chain is terminated and the result is
|
||||
// nullopt.
|
||||
// Maps can also have synchronous cleanup functions, which are guaranteed to be
|
||||
// called at the termination of each run through the chain.
|
||||
template <typename T> |
||||
class InterceptorList { |
||||
private: |
||||
// A map of T -> T via promises.
|
||||
class Map { |
||||
public: |
||||
explicit Map(DebugLocation from) : from_(from) {} |
||||
// Construct a promise to transform x into some other value at memory.
|
||||
virtual void MakePromise(T x, void* memory) = 0; |
||||
// Destroy a promise constructed at memory.
|
||||
virtual void Destroy(void* memory) = 0; |
||||
// Poll a promise constructed at memory.
|
||||
// Resolves to an optional<T> -- if nullopt it means terminate the chain and
|
||||
// resolve.
|
||||
virtual Poll<absl::optional<T>> PollOnce(void* memory) = 0; |
||||
virtual ~Map() = default; |
||||
|
||||
// Update the next pointer stored with this map.
|
||||
// This is only valid to call once, and only before the map is used.
|
||||
void SetNext(Map* next) { |
||||
GPR_DEBUG_ASSERT(next_ == nullptr); |
||||
next_ = next; |
||||
} |
||||
|
||||
// Access the creation location for this map (for debug tracing).
|
||||
DebugLocation from() const { return from_; } |
||||
|
||||
// Access the next map in the chain (or nullptr if this is the last map).
|
||||
Map* next() const { return next_; } |
||||
|
||||
private: |
||||
GPR_NO_UNIQUE_ADDRESS const DebugLocation from_; |
||||
Map* next_ = nullptr; |
||||
}; |
||||
|
||||
public: |
||||
// The result of Run: a promise that will execute the entire chain.
|
||||
class RunPromise { |
||||
public: |
||||
RunPromise(size_t memory_required, Map* factory, absl::optional<T> value) { |
||||
if (!value.has_value() || factory == nullptr) { |
||||
is_immediately_resolved_ = true; |
||||
Construct(&result_, std::move(value)); |
||||
} else { |
||||
is_immediately_resolved_ = false; |
||||
Construct(&async_resolution_, memory_required); |
||||
factory->MakePromise(std::move(*value), async_resolution_.space.get()); |
||||
async_resolution_.current_factory = factory; |
||||
} |
||||
} |
||||
|
||||
~RunPromise() { |
||||
if (is_immediately_resolved_) { |
||||
Destruct(&result_); |
||||
} else { |
||||
if (async_resolution_.current_factory != nullptr) { |
||||
async_resolution_.current_factory->Destroy( |
||||
async_resolution_.space.get()); |
||||
} |
||||
Destruct(&async_resolution_); |
||||
} |
||||
} |
||||
|
||||
RunPromise(const RunPromise&) = delete; |
||||
RunPromise& operator=(const RunPromise&) = delete; |
||||
|
||||
RunPromise(RunPromise&& other) noexcept |
||||
: is_immediately_resolved_(other.is_immediately_resolved_) { |
||||
if (is_immediately_resolved_) { |
||||
Construct(&result_, std::move(other.result_)); |
||||
} else { |
||||
Construct(&async_resolution_, std::move(other.async_resolution_)); |
||||
} |
||||
} |
||||
|
||||
RunPromise& operator=(RunPromise&& other) noexcept = delete; |
||||
|
||||
Poll<absl::optional<T>> operator()() { |
||||
if (grpc_trace_promise_primitives.enabled()) { |
||||
gpr_log(GPR_DEBUG, "InterceptorList::RunPromise: %s", |
||||
DebugString().c_str()); |
||||
} |
||||
if (is_immediately_resolved_) return std::move(result_); |
||||
while (true) { |
||||
auto r = async_resolution_.current_factory->PollOnce( |
||||
async_resolution_.space.get()); |
||||
if (auto* p = absl::get_if<kPollReadyIdx>(&r)) { |
||||
async_resolution_.current_factory->Destroy( |
||||
async_resolution_.space.get()); |
||||
async_resolution_.current_factory = |
||||
async_resolution_.current_factory->next(); |
||||
if (async_resolution_.current_factory == nullptr || !p->has_value()) { |
||||
return std::move(*p); |
||||
} |
||||
async_resolution_.current_factory->MakePromise( |
||||
std::move(**p), async_resolution_.space.get()); |
||||
continue; |
||||
} |
||||
return Pending{}; |
||||
} |
||||
} |
||||
|
||||
private: |
||||
std::string DebugString() const { |
||||
if (is_immediately_resolved_) { |
||||
return absl::StrFormat("Result:has_value:%d", result_.has_value()); |
||||
} else { |
||||
return absl::StrCat( |
||||
"Running:", |
||||
async_resolution_.current_factory == nullptr |
||||
? "END" |
||||
: ([p = async_resolution_.current_factory->from()]() { |
||||
return absl::StrCat(p.file(), ":", p.line()); |
||||
})() |
||||
.c_str()); |
||||
} |
||||
} |
||||
struct AsyncResolution { |
||||
explicit AsyncResolution(size_t max_size) |
||||
: space(GetContext<Arena>()->MakePooledArray<char>(max_size)) {} |
||||
AsyncResolution(const AsyncResolution&) = delete; |
||||
AsyncResolution& operator=(const AsyncResolution&) = delete; |
||||
AsyncResolution(AsyncResolution&& other) noexcept |
||||
: current_factory(std::exchange(other.current_factory, nullptr)), |
||||
space(std::move(other.space)) {} |
||||
Map* current_factory; |
||||
Arena::PoolPtr<char[]> space; |
||||
}; |
||||
union { |
||||
AsyncResolution async_resolution_; |
||||
absl::optional<T> result_; |
||||
}; |
||||
// If true, the result_ union is valid, otherwise async_resolution_ is.
|
||||
// Indicates whether the promise resolved immediately at construction or if
|
||||
// additional steps were needed.
|
||||
bool is_immediately_resolved_; |
||||
}; |
||||
|
||||
InterceptorList() = default; |
||||
InterceptorList(const InterceptorList&) = delete; |
||||
InterceptorList& operator=(const InterceptorList&) = delete; |
||||
~InterceptorList() { DeleteFactories(); } |
||||
|
||||
RunPromise Run(absl::optional<T> initial_value) { |
||||
return RunPromise(promise_memory_required_, first_map_, |
||||
std::move(initial_value)); |
||||
} |
||||
|
||||
// Append a new map to the end of the chain.
|
||||
template <typename Fn> |
||||
void AppendMap(Fn fn, DebugLocation from) { |
||||
Append(MakeMapToAdd( |
||||
std::move(fn), [] {}, from)); |
||||
} |
||||
|
||||
// Prepend a new map to the beginning of the chain.
|
||||
template <typename Fn> |
||||
void PrependMap(Fn fn, DebugLocation from) { |
||||
Prepend(MakeMapToAdd( |
||||
std::move(fn), [] {}, from)); |
||||
} |
||||
|
||||
// Append a new map to the end of the chain, with a cleanup function to be
|
||||
// called at the end of run promise execution.
|
||||
template <typename Fn, typename CleanupFn> |
||||
void AppendMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { |
||||
Append(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from)); |
||||
} |
||||
|
||||
// Prepend a new map to the beginning of the chain, with a cleanup function to
|
||||
// be called at the end of run promise execution.
|
||||
template <typename Fn, typename CleanupFn> |
||||
void PrependMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { |
||||
Prepend(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from)); |
||||
} |
||||
|
||||
protected: |
||||
// Clear the interceptor list
|
||||
void ResetInterceptorList() { |
||||
DeleteFactories(); |
||||
first_map_ = nullptr; |
||||
last_map_ = nullptr; |
||||
promise_memory_required_ = 0; |
||||
} |
||||
|
||||
private: |
||||
template <typename Fn, typename CleanupFn> |
||||
class MapImpl final : public Map { |
||||
public: |
||||
using PromiseFactory = promise_detail::RepeatedPromiseFactory<T, Fn>; |
||||
using Promise = typename PromiseFactory::Promise; |
||||
|
||||
explicit MapImpl(Fn fn, CleanupFn cleanup_fn, DebugLocation from) |
||||
: Map(from), fn_(std::move(fn)), cleanup_fn_(std::move(cleanup_fn)) {} |
||||
~MapImpl() override { cleanup_fn_(); } |
||||
void MakePromise(T x, void* memory) override { |
||||
new (memory) Promise(fn_.Make(std::move(x))); |
||||
} |
||||
void Destroy(void* memory) override { |
||||
static_cast<Promise*>(memory)->~Promise(); |
||||
} |
||||
Poll<absl::optional<T>> PollOnce(void* memory) override { |
||||
return poll_cast<absl::optional<T>>((*static_cast<Promise*>(memory))()); |
||||
} |
||||
|
||||
private: |
||||
GPR_NO_UNIQUE_ADDRESS PromiseFactory fn_; |
||||
GPR_NO_UNIQUE_ADDRESS CleanupFn cleanup_fn_; |
||||
}; |
||||
|
||||
template <typename Fn, typename CleanupFn> |
||||
Map* MakeMapToAdd(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { |
||||
using FactoryType = MapImpl<Fn, CleanupFn>; |
||||
promise_memory_required_ = std::max(promise_memory_required_, |
||||
sizeof(typename FactoryType::Promise)); |
||||
return GetContext<Arena>()->New<FactoryType>(std::move(fn), |
||||
std::move(cleanup_fn), from); |
||||
} |
||||
|
||||
void Append(Map* f) { |
||||
if (first_map_ == nullptr) { |
||||
first_map_ = f; |
||||
last_map_ = f; |
||||
} else { |
||||
last_map_->SetNext(f); |
||||
last_map_ = f; |
||||
} |
||||
} |
||||
|
||||
void Prepend(Map* f) { |
||||
if (first_map_ == nullptr) { |
||||
first_map_ = f; |
||||
last_map_ = f; |
||||
} else { |
||||
f->SetNext(first_map_); |
||||
first_map_ = f; |
||||
} |
||||
} |
||||
|
||||
void DeleteFactories() { |
||||
for (auto* f = first_map_; f != nullptr;) { |
||||
auto* next = f->next(); |
||||
f->~Map(); |
||||
f = next; |
||||
} |
||||
} |
||||
|
||||
// The first map in the chain.
|
||||
Map* first_map_ = nullptr; |
||||
// The last map in the chain.
|
||||
Map* last_map_ = nullptr; |
||||
// The amount of memory required to store the largest promise in the chain.
|
||||
size_t promise_memory_required_ = 0; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H
|
@ -0,0 +1,24 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_PROMISE_TRACE_H |
||||
#define GRPC_SRC_CORE_LIB_PROMISE_TRACE_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
|
||||
extern grpc_core::DebugOnlyTraceFlag grpc_trace_promise_primitives; |
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_PROMISE_TRACE_H
|
@ -0,0 +1,20 @@ |
||||
// Copyright 2016 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/slice/slice_refcount.h" |
||||
|
||||
grpc_core::DebugOnlyTraceFlag grpc_slice_refcount_trace(false, |
||||
"slice_refcount"); |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,144 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/promise/interceptor_list.h" |
||||
|
||||
#include <initializer_list> |
||||
#include <memory> |
||||
|
||||
#include "gtest/gtest.h" |
||||
|
||||
#include <grpc/event_engine/memory_allocator.h> |
||||
|
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/resource_quota/memory_quota.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "test/core/promise/test_context.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace { |
||||
|
||||
class InterceptorListTest : public ::testing::Test { |
||||
protected: |
||||
MemoryAllocator memory_allocator_ = MemoryAllocator( |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test")); |
||||
ScopedArenaPtr arena_ = MakeScopedArena(1024, &memory_allocator_); |
||||
TestContext<Arena> arena_ctx_{arena_.get()}; |
||||
}; |
||||
|
||||
TEST_F(InterceptorListTest, NoOp) { InterceptorList<std::string>(); } |
||||
|
||||
TEST_F(InterceptorListTest, CanRunOne) { |
||||
InterceptorList<std::string> list; |
||||
list.AppendMap([](std::string s) { return s + "a"; }, DEBUG_LOCATION); |
||||
EXPECT_EQ(list.Run("hello")(), Poll<absl::optional<std::string>>("helloa")); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunTwo) { |
||||
InterceptorList<std::string> list; |
||||
list.AppendMap([](std::string s) { return s + "a"; }, DEBUG_LOCATION); |
||||
list.AppendMap([](std::string s) { return s + "b"; }, DEBUG_LOCATION); |
||||
EXPECT_EQ(list.Run("hello")(), Poll<absl::optional<std::string>>("helloab")); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunTwoTwice) { |
||||
InterceptorList<std::string> list; |
||||
list.AppendMap([](std::string s) { return s + s; }, DEBUG_LOCATION); |
||||
list.AppendMap([](std::string s) { return s + s + s; }, DEBUG_LOCATION); |
||||
EXPECT_EQ(absl::get<kPollReadyIdx>(list.Run(std::string(10, 'a'))()).value(), |
||||
std::string(60, 'a')); |
||||
EXPECT_EQ(absl::get<kPollReadyIdx>(list.Run(std::string(100, 'b'))()).value(), |
||||
std::string(600, 'b')); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunManyWithCaptures) { |
||||
InterceptorList<std::string> list; |
||||
for (size_t i = 0; i < 26 * 1000; i++) { |
||||
list.AppendMap( |
||||
[i = std::make_shared<size_t>(i)](std::string s) { |
||||
return s + static_cast<char>((*i % 26) + 'a'); |
||||
}, |
||||
DEBUG_LOCATION); |
||||
} |
||||
std::string expected; |
||||
for (size_t i = 0; i < 1000; i++) { |
||||
expected += "abcdefghijklmnopqrstuvwxyz"; |
||||
} |
||||
EXPECT_EQ(absl::get<kPollReadyIdx>(list.Run("")()).value(), expected); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunOnePrepended) { |
||||
InterceptorList<std::string> list; |
||||
list.PrependMap([](std::string s) { return s + "a"; }, DEBUG_LOCATION); |
||||
EXPECT_EQ(list.Run("hello")(), Poll<absl::optional<std::string>>("helloa")); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunTwoPrepended) { |
||||
InterceptorList<std::string> list; |
||||
list.PrependMap([](std::string s) { return s + "a"; }, DEBUG_LOCATION); |
||||
list.PrependMap([](std::string s) { return s + "b"; }, DEBUG_LOCATION); |
||||
EXPECT_EQ(list.Run("hello")(), Poll<absl::optional<std::string>>("helloba")); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunManyWithCapturesPrepended) { |
||||
InterceptorList<std::string> list; |
||||
for (size_t i = 0; i < 26 * 1000; i++) { |
||||
list.PrependMap( |
||||
[i = std::make_shared<size_t>(i)](std::string s) { |
||||
return s + static_cast<char>((*i % 26) + 'a'); |
||||
}, |
||||
DEBUG_LOCATION); |
||||
} |
||||
std::string expected; |
||||
for (size_t i = 0; i < 1000; i++) { |
||||
expected += "zyxwvutsrqponmlkjihgfedcba"; |
||||
} |
||||
EXPECT_EQ(absl::get<kPollReadyIdx>(list.Run("")()).value(), expected); |
||||
} |
||||
|
||||
TEST_F(InterceptorListTest, CanRunManyWithCapturesThatDelay) { |
||||
InterceptorList<std::string> list; |
||||
for (size_t i = 0; i < 26 * 1000; i++) { |
||||
list.AppendMap( |
||||
[i = std::make_shared<size_t>(i)](std::string s) { |
||||
return |
||||
[x = false, i, s]() mutable -> Poll<absl::optional<std::string>> { |
||||
if (!x) { |
||||
x = true; |
||||
return Pending{}; |
||||
} |
||||
return s + static_cast<char>((*i % 26) + 'a'); |
||||
}; |
||||
}, |
||||
DEBUG_LOCATION); |
||||
} |
||||
auto promise = list.Run(""); |
||||
for (size_t i = 0; i < 26 * 1000; i++) { |
||||
EXPECT_TRUE(absl::holds_alternative<Pending>(promise())) << i; |
||||
} |
||||
std::string expected; |
||||
for (size_t i = 0; i < 1000; i++) { |
||||
expected += "abcdefghijklmnopqrstuvwxyz"; |
||||
} |
||||
EXPECT_EQ(absl::get<kPollReadyIdx>(promise()).value(), expected); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue