[promises] Remove TryConcurrently - it was the wrong idea (#32298)

pull/32305/head
Craig Tiller 2 years ago committed by GitHub
parent 860947605b
commit c9b47f8584
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 70
      CMakeLists.txt
  2. 93
      build_autogenerated.yaml
  3. 29
      src/core/BUILD
  4. 370
      src/core/lib/promise/try_concurrently.h
  5. 15
      test/core/promise/BUILD
  6. 194
      test/core/promise/try_concurrently_test.cc
  7. 24
      tools/run_tests/generated/tests.json

70
CMakeLists.txt generated

@ -1252,7 +1252,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx transport_security_common_api_test)
add_dependencies(buildtests_cxx transport_security_test)
add_dependencies(buildtests_cxx transport_stream_receiver_test)
add_dependencies(buildtests_cxx try_concurrently_test)
add_dependencies(buildtests_cxx try_join_test)
add_dependencies(buildtests_cxx try_seq_metadata_test)
add_dependencies(buildtests_cxx try_seq_test)
@ -21579,75 +21578,6 @@ target_link_libraries(transport_stream_receiver_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(try_concurrently_test
src/core/ext/upb-generated/google/protobuf/any.upb.c
src/core/ext/upb-generated/google/rpc/status.upb.c
src/core/lib/debug/trace.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/iomgr/closure.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/trace.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/periodic_update.cc
src/core/lib/resource_quota/trace.cc
src/core/lib/slice/percent_encoding.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
test/core/promise/try_concurrently_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_compile_features(try_concurrently_test PUBLIC cxx_std_14)
target_include_directories(try_concurrently_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(try_concurrently_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::function_ref
absl::hash
absl::type_traits
absl::statusor
absl::utility
gpr
upb
)
endif()
if(gRPC_BUILD_TESTS)

@ -12210,99 +12210,6 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: try_concurrently_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/upb-generated/google/protobuf/any.upb.h
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/debug/trace.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/time.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/detail/switch.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/if.h
- src/core/lib/promise/interceptor_list.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/map_pipe.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_concurrently.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/periodic_update.h
- src/core/lib/resource_quota/trace.h
- src/core/lib/slice/percent_encoding.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/iomgr/closure.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/trace.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/periodic_update.cc
- src/core/lib/resource_quota/trace.cc
- src/core/lib/slice/percent_encoding.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/try_concurrently_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
- absl/utility:utility
- gpr
- upb
uses_polling: false
- name: try_join_test
gtest: true
build: test

@ -379,35 +379,6 @@ grpc_cc_library(
deps = ["//:gpr_platform"],
)
grpc_cc_library(
name = "try_concurrently",
hdrs = ["lib/promise/try_concurrently.h"],
external_deps = [
"absl/status",
"absl/strings",
"absl/types:variant",
],
language = "c++",
public_hdrs = [
"lib/promise/map_pipe.h",
],
deps = [
"activity",
"construct_destruct",
"for_each",
"map",
"pipe",
"poll",
"promise_factory",
"promise_like",
"promise_status",
"promise_trace",
"try_seq",
"//:gpr",
"//:grpc_trace",
],
)
grpc_cc_library(
name = "map_pipe",
external_deps = ["absl/status"],

@ -1,370 +0,0 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_PROMISE_TRY_CONCURRENTLY_H
#define GRPC_SRC_CORE_LIB_PROMISE_TRY_CONCURRENTLY_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <cstdint>
#include <string>
#include <type_traits>
#include <utility>
#include "absl/strings/str_cat.h"
#include "absl/types/variant.h"
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/detail/promise_like.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/trace.h"
namespace grpc_core {
namespace promise_detail {
template <typename Promise>
struct Necessary {
PromiseLike<Promise> promise;
static constexpr bool must_complete() { return true; }
};
template <typename Promise>
struct Helper {
PromiseLike<Promise> promise;
static constexpr bool must_complete() { return false; }
};
// A set of promises that can be polled concurrently.
// Fuses them when completed (that is, destroys the promise and records it
// completed).
// Relies on an external bit field to handle the recording: this saves a bunch
// of space, but means the implementation of this type is weird: it's really
// super tied to TryConcurrently and no attempt should be made to use this
// independently.
template <typename... Ts>
class FusedSet;
template <typename T, typename... Ts>
class FusedSet<T, Ts...> : public FusedSet<Ts...> {
public:
explicit FusedSet(T&& x, Ts&&... xs)
: FusedSet<Ts...>(std::forward<T>(xs)...) {
Construct(&wrapper_, std::forward<T>(x));
}
explicit FusedSet(T&& x, FusedSet<Ts...>&& xs)
: FusedSet<Ts...>(std::forward<FusedSet<Ts...>>(xs)) {
Construct(&wrapper_, std::forward<T>(x));
}
// Empty destructor: consumers must call Destroy() to ensure cleanup occurs
~FusedSet() {}
FusedSet(const FusedSet&) = delete;
FusedSet& operator=(const FusedSet&) = delete;
// Assumes all 'done_bits' for other are 0 and will be set to 1
FusedSet(FusedSet&& other) noexcept : FusedSet<Ts...>(std::move(other)) {
Construct(&wrapper_, std::move(other.wrapper_));
Destruct(&other.wrapper_);
}
static constexpr size_t Size() { return 1 + sizeof...(Ts); }
static constexpr uint8_t NecessaryBits() {
return (T::must_complete() ? 1 : 0) |
(FusedSet<Ts...>::NecessaryBits() << 1);
}
template <int kDoneBit>
void Destroy(uint8_t done_bits) {
if ((done_bits & (1 << kDoneBit)) == 0) {
Destruct(&wrapper_);
}
FusedSet<Ts...>::template Destroy<kDoneBit + 1>(done_bits);
}
template <typename Result, int kDoneBit>
Poll<Result> Run(uint8_t& done_bits) {
if ((done_bits & (1 << kDoneBit)) == 0) {
auto p = wrapper_.promise();
if (auto* status = absl::get_if<kPollReadyIdx>(&p)) {
done_bits |= (1 << kDoneBit);
Destruct(&wrapper_);
if (!IsStatusOk(*status)) {
return StatusCast<Result>(std::move(*status));
}
}
}
return FusedSet<Ts...>::template Run<Result, kDoneBit + 1>(done_bits);
}
template <typename P>
FusedSet<P, T, Ts...> With(P x) {
return FusedSet<P, T, Ts...>(std::move(x), std::move(*this));
}
private:
union {
T wrapper_;
};
};
template <>
class FusedSet<> {
public:
static constexpr size_t Size() { return 0; }
static constexpr uint8_t NecessaryBits() { return 0; }
template <typename Result, int kDoneBit>
Poll<Result> Run(uint8_t) {
return Pending{};
}
template <int kDoneBit>
void Destroy(uint8_t) {}
template <typename P>
FusedSet<P> With(P x) {
return FusedSet<P>(std::move(x));
}
};
template <typename Main, typename PreMain, typename PostMain>
class TryConcurrently {
public:
TryConcurrently(Main main, PreMain pre_main, PostMain post_main)
: done_bits_(0),
pre_main_(std::move(pre_main)),
post_main_(std::move(post_main)) {
Construct(&main_, std::move(main));
}
TryConcurrently(const TryConcurrently&) = delete;
TryConcurrently& operator=(const TryConcurrently&) = delete;
TryConcurrently(TryConcurrently&& other) noexcept
: done_bits_(0),
pre_main_(std::move(other.pre_main_)),
post_main_(std::move(other.post_main_)) {
GPR_DEBUG_ASSERT(other.done_bits_ == 0);
other.done_bits_ = HelperBits();
Construct(&main_, std::move(other.main_));
}
TryConcurrently& operator=(TryConcurrently&& other) noexcept {
GPR_DEBUG_ASSERT(other.done_bits_ == 0);
done_bits_ = 0;
other.done_bits_ = HelperBits();
pre_main_ = std::move(other.pre_main_);
post_main_ = std::move(other.post_main_);
Construct(&main_, std::move(other.main_));
return *this;
}
~TryConcurrently() {
if (done_bits_ & 1) {
Destruct(&result_);
} else {
Destruct(&main_);
}
pre_main_.template Destroy<1>(done_bits_);
post_main_.template Destroy<1 + PreMain::Size()>(done_bits_);
}
using Result =
typename PollTraits<decltype(std::declval<PromiseLike<Main>>()())>::Type;
Poll<Result> operator()() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%sBEGIN POLL: done_bits=%x necessary_bits=%x",
DebugTag().c_str(), done_bits_, NecessaryBits());
}
auto r = pre_main_.template Run<Result, 1>(done_bits_);
if (auto* status = absl::get_if<Result>(&r)) {
GPR_DEBUG_ASSERT(!IsStatusOk(*status));
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG,
"%sFAIL POLL PRE-MAIN: done_bits=%x necessary_bits=%x",
DebugTag().c_str(), done_bits_, NecessaryBits());
}
return std::move(*status);
}
if ((done_bits_ & 1) == 0) {
auto p = main_();
if (auto* status = absl::get_if<kPollReadyIdx>(&p)) {
done_bits_ |= 1;
Destruct(&main_);
Construct(&result_, std::move(*status));
}
}
r = post_main_.template Run<Result, 1 + PreMain::Size()>(done_bits_);
if (auto* status = absl::get_if<Result>(&r)) {
GPR_DEBUG_ASSERT(!IsStatusOk(*status));
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG,
"%sFAIL POLL POST-MAIN: done_bits=%x necessary_bits=%x",
DebugTag().c_str(), done_bits_, NecessaryBits());
}
return std::move(*status);
}
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%sEND POLL: done_bits=%x necessary_bits=%x",
DebugTag().c_str(), done_bits_, NecessaryBits());
}
if ((done_bits_ & NecessaryBits()) == NecessaryBits()) {
return std::move(result_);
}
return Pending{};
}
template <typename P>
auto NecessaryPush(P p);
template <typename P>
auto NecessaryPull(P p);
template <typename P>
auto Push(P p);
template <typename P>
auto Pull(P p);
private:
// Bitmask for done_bits_ specifying which promises must be completed prior to
// returning ok.
constexpr uint8_t NecessaryBits() {
return 1 | (PreMain::NecessaryBits() << 1) |
(PostMain::NecessaryBits() << (1 + PreMain::Size()));
}
// Bitmask for done_bits_ specifying what all of the promises being complete
// would look like.
constexpr uint8_t AllBits() {
return (1 << (1 + PreMain::Size() + PostMain::Size())) - 1;
}
// Bitmask of done_bits_ specifying which bits correspond to helper promises -
// that is all promises that are not the main one.
constexpr uint8_t HelperBits() { return AllBits() ^ 1; }
std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " TRY_CONCURRENTLY[0x",
reinterpret_cast<uintptr_t>(this), "]: ");
}
// done_bits signifies which operations have completed.
// Bit 0 is set if main_ has completed.
// The next higher bits correspond one per pre-main promise.
// The next higher bits correspond one per post-main promise.
// So, going from most significant bit to least significant:
// +--------------+-------------+--------+
// |post_main bits|pre_main bits|main bit|
// +--------------+-------------+--------+
uint8_t done_bits_;
PreMain pre_main_;
union {
PromiseLike<Main> main_;
Result result_;
};
PostMain post_main_;
};
template <typename Main, typename PreMain, typename PostMain>
auto MakeTryConcurrently(Main&& main, PreMain&& pre_main,
PostMain&& post_main) {
return TryConcurrently<Main, PreMain, PostMain>(
std::forward<Main>(main), std::forward<PreMain>(pre_main),
std::forward<PostMain>(post_main));
}
template <typename Main, typename PreMain, typename PostMain>
template <typename P>
auto TryConcurrently<Main, PreMain, PostMain>::NecessaryPush(P p) {
GPR_DEBUG_ASSERT(done_bits_ == 0);
done_bits_ = HelperBits();
return MakeTryConcurrently(std::move(main_),
pre_main_.With(Necessary<P>{std::move(p)}),
std::move(post_main_));
}
template <typename Main, typename PreMain, typename PostMain>
template <typename P>
auto TryConcurrently<Main, PreMain, PostMain>::NecessaryPull(P p) {
GPR_DEBUG_ASSERT(done_bits_ == 0);
done_bits_ = HelperBits();
return MakeTryConcurrently(std::move(main_), std::move(pre_main_),
post_main_.With(Necessary<P>{std::move(p)}));
}
template <typename Main, typename PreMain, typename PostMain>
template <typename P>
auto TryConcurrently<Main, PreMain, PostMain>::Push(P p) {
GPR_DEBUG_ASSERT(done_bits_ == 0);
done_bits_ = HelperBits();
return MakeTryConcurrently(std::move(main_),
pre_main_.With(Helper<P>{std::move(p)}),
std::move(post_main_));
}
template <typename Main, typename PreMain, typename PostMain>
template <typename P>
auto TryConcurrently<Main, PreMain, PostMain>::Pull(P p) {
GPR_DEBUG_ASSERT(done_bits_ == 0);
done_bits_ = HelperBits();
return MakeTryConcurrently(std::move(main_), std::move(pre_main_),
post_main_.With(Helper<P>{std::move(p)}));
}
} // namespace promise_detail
// TryConcurrently runs a set of promises concurrently.
// There is a structure to the promises:
// - A 'main' promise dominates the others - it must complete before the
// overall promise successfully completes. Its result is chosen in the event
// of successful completion.
// - A set of (optional) push and pull promises to aid main. Push promises are
// polled before main, pull promises are polled after. In this way we can
// avoid overall wakeup churn - sending a message will tend to push things
// down the promise tree as its polled, so that send should be in a push
// promise - then as the main promise is polled and it calls into things
// lower in the stack they'll already see things there (this reasoning holds
// for receiving things and the pull promises too!).
// - Each push and pull promise is either necessary or optional.
// Necessary promises must complete successfully before the overall promise
// completes. Optional promises will just be cancelled once the main promise
// completes and any necessary helpers.
// - If any of the promises fail, the overall promise fails immediately.
// API:
// This function, TryConcurrently, is used to create a TryConcurrently promise.
// It takes a single argument, being the main promise. That promise also has
// a set of methods for attaching push and pull promises. The act of attachment
// returns a new TryConcurrently promise with previous contained promises moved
// out.
// The methods exposed:
// - Push, NecessaryPush: attach a push promise (with the first variant being
// optional, the second necessary).
// - Pull, NecessaryPull: attach a pull promise, with variants as above.
// Example:
// TryConcurrently(call_next_filter(std::move(call_args)))
// .Push(send_messages_promise)
// .Pull(recv_messages_promise)
template <typename Main>
auto TryConcurrently(Main main) {
return promise_detail::MakeTryConcurrently(std::move(main),
promise_detail::FusedSet<>(),
promise_detail::FusedSet<>());
}
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_TRY_CONCURRENTLY_H

@ -512,18 +512,3 @@ grpc_cc_test(
"//test/core/event_engine:mock_event_engine",
],
)
grpc_cc_test(
name = "try_concurrently_test",
srcs = ["try_concurrently_test.cc"],
external_deps = [
"absl/status",
"absl/strings",
"gtest",
],
language = "c++",
tags = ["promise_test"],
uses_event_engine = False,
uses_polling = False,
deps = ["//src/core:try_concurrently"],
)

@ -1,194 +0,0 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/promise/try_concurrently.h"
#include <algorithm>
#include <iosfwd>
#include <memory>
#include <string>
#include <vector>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "gtest/gtest.h"
namespace grpc_core {
class PromiseFactory {
public:
// Create a promise that resolves to Ok but has a memory allocation (to verify
// destruction)
// tag should have static lifetime (i.e. pass a string literal here).
auto OkPromise(absl::string_view tag) {
return [this, tag,
p = std::make_unique<absl::Status>(absl::OkStatus())]() mutable {
order_.push_back(tag);
return std::move(*p);
};
}
// Create a promise that never resolves and carries a memory allocation
// tag should have static lifetime (i.e. pass a string literal here).
auto NeverPromise(absl::string_view tag) {
return
[this, tag, p = std::make_unique<Pending>()]() -> Poll<absl::Status> {
order_.push_back(tag);
return *p;
};
}
// Create a promise that fails and carries a memory allocation
// tag should have static lifetime (i.e. pass a string literal here).
auto FailPromise(absl::string_view tag) {
return [this, p = std::make_unique<absl::Status>(absl::UnknownError(tag)),
tag]() mutable {
order_.push_back(tag);
return std::move(*p);
};
}
// Finish one round and return a vector of strings representing which promises
// were polled and in which order.
std::vector<absl::string_view> Finish() { return std::exchange(order_, {}); }
private:
std::vector<absl::string_view> order_;
};
std::ostream& operator<<(std::ostream& out, const Poll<absl::Status>& p) {
return out << PollToString(
p, [](const absl::Status& s) { return s.ToString(); });
}
TEST(TryConcurrentlyTest, Immediate) {
PromiseFactory pf;
auto a = TryConcurrently(pf.OkPromise("1"));
EXPECT_EQ(a(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"1"}));
auto b = TryConcurrently(pf.OkPromise("1")).NecessaryPush(pf.OkPromise("2"));
EXPECT_EQ(b(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"2", "1"}));
auto c = TryConcurrently(pf.OkPromise("1")).NecessaryPull(pf.OkPromise("2"));
EXPECT_EQ(c(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"1", "2"}));
auto d = TryConcurrently(pf.OkPromise("1"))
.NecessaryPull(pf.OkPromise("2"))
.NecessaryPush(pf.OkPromise("3"));
EXPECT_EQ(d(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"3", "1", "2"}));
auto e = TryConcurrently(pf.OkPromise("1")).Push(pf.NeverPromise("2"));
EXPECT_EQ(e(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"2", "1"}));
auto f = TryConcurrently(pf.OkPromise("1")).Pull(pf.NeverPromise("2"));
EXPECT_EQ(f(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"1", "2"}));
}
TEST(TryConcurrentlyTest, Paused) {
PromiseFactory pf;
auto a = TryConcurrently(pf.NeverPromise("1"));
EXPECT_EQ(a(), Poll<absl::Status>(Pending{}));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"1"}));
auto b =
TryConcurrently(pf.OkPromise("1")).NecessaryPush(pf.NeverPromise("2"));
EXPECT_EQ(b(), Poll<absl::Status>(Pending{}));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"2", "1"}));
auto c =
TryConcurrently(pf.OkPromise("1")).NecessaryPull(pf.NeverPromise("2"));
EXPECT_EQ(c(), Poll<absl::Status>(Pending{}));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"1", "2"}));
}
TEST(TryConcurrentlyTest, OneFailed) {
PromiseFactory pf;
auto a = TryConcurrently(pf.FailPromise("bah"));
EXPECT_EQ(a(), Poll<absl::Status>(absl::UnknownError("bah")));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"bah"}));
auto b = TryConcurrently(pf.NeverPromise("1"))
.NecessaryPush(pf.FailPromise("humbug"));
EXPECT_EQ(b(), Poll<absl::Status>(absl::UnknownError("humbug")));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"humbug"}));
auto c = TryConcurrently(pf.NeverPromise("1"))
.NecessaryPull(pf.FailPromise("wha"));
EXPECT_EQ(c(), Poll<absl::Status>(absl::UnknownError("wha")));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>({"1", "wha"}));
}
TEST(TryConcurrently, MuchPush) {
PromiseFactory pf;
auto p = TryConcurrently(pf.OkPromise("1"))
.NecessaryPush(pf.OkPromise("2"))
.NecessaryPush(pf.OkPromise("3"))
.NecessaryPush(pf.OkPromise("4"))
.NecessaryPush(pf.OkPromise("5"))
.NecessaryPush(pf.OkPromise("6"))
.NecessaryPush(pf.OkPromise("7"))
.NecessaryPush(pf.OkPromise("8"));
EXPECT_EQ(p(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>(
{"8", "7", "6", "5", "4", "3", "2", "1"}));
}
TEST(TryConcurrently, SoPull) {
PromiseFactory pf;
auto p = TryConcurrently(pf.OkPromise("1"))
.NecessaryPull(pf.OkPromise("2"))
.NecessaryPull(pf.OkPromise("3"))
.NecessaryPull(pf.OkPromise("4"))
.NecessaryPull(pf.OkPromise("5"))
.NecessaryPull(pf.OkPromise("6"))
.NecessaryPull(pf.OkPromise("7"))
.NecessaryPull(pf.OkPromise("8"));
EXPECT_EQ(p(), Poll<absl::Status>(absl::OkStatus()));
EXPECT_EQ(pf.Finish(), std::vector<absl::string_view>(
{"1", "8", "7", "6", "5", "4", "3", "2"}));
}
// A pointer to an int designed to cause a double free if it's double destructed
// (to flush out bugs)
class ProblematicPointer {
public:
ProblematicPointer() : p_(new int(0)) {}
~ProblematicPointer() { delete p_; }
ProblematicPointer(const ProblematicPointer&) = delete;
ProblematicPointer& operator=(const ProblematicPointer&) = delete;
// NOLINTNEXTLINE: we want to allocate during move
ProblematicPointer(ProblematicPointer&& other) : p_(new int(*other.p_ + 1)) {}
ProblematicPointer& operator=(ProblematicPointer&& other) = delete;
private:
int* p_;
};
TEST(TryConcurrentlyTest, MoveItMoveIt) {
auto a =
TryConcurrently([x = ProblematicPointer()]() { return absl::OkStatus(); })
.NecessaryPull(
[x = ProblematicPointer()]() { return absl::OkStatus(); })
.NecessaryPush(
[x = ProblematicPointer()]() { return absl::OkStatus(); })
.Push([x = ProblematicPointer()]() { return absl::OkStatus(); })
.Pull([x = ProblematicPointer()]() { return absl::OkStatus(); });
auto b = std::move(a);
auto c = std::move(b);
c();
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -8049,30 +8049,6 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "try_concurrently_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save