Call push/pull abstraction (#29033)

* push/pull impl

* comment

* fix

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/29089/head
Craig Tiller 3 years ago committed by GitHub
parent b7900977b5
commit 736441f288
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      BUILD
  2. 38
      CMakeLists.txt
  3. 19
      build_autogenerated.yaml
  4. 144
      src/core/lib/promise/call_push_pull.h
  5. 15
      test/core/promise/BUILD
  6. 76
      test/core/promise/call_push_pull_test.cc
  7. 24
      tools/run_tests/generated/tests.json

13
BUILD

@ -1056,6 +1056,19 @@ grpc_cc_library(
deps = ["gpr_platform"],
)
grpc_cc_library(
name = "call_push_pull",
hdrs = ["src/core/lib/promise/call_push_pull.h"],
language = "c++",
deps = [
"bitset",
"construct_destruct",
"poll",
"promise_like",
"promise_status",
],
)
grpc_cc_library(
name = "context",
language = "c++",

38
CMakeLists.txt generated

@ -799,6 +799,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx byte_buffer_test)
add_dependencies(buildtests_cxx byte_stream_test)
add_dependencies(buildtests_cxx call_finalization_test)
add_dependencies(buildtests_cxx call_push_pull_test)
add_dependencies(buildtests_cxx cancel_ares_query_test)
add_dependencies(buildtests_cxx capture_test)
add_dependencies(buildtests_cxx cel_authorization_engine_test)
@ -8216,6 +8217,43 @@ target_link_libraries(call_finalization_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(call_push_pull_test
test/core/promise/call_push_pull_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(call_push_pull_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(call_push_pull_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::status
absl::statusor
absl::variant
)
endif()
if(gRPC_BUILD_TESTS)

@ -4750,6 +4750,25 @@ targets:
- test/core/channel/call_finalization_test.cc
deps:
- grpc_test_util
- name: call_push_pull_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/gpr/useful.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/construct_destruct.h
- src/core/lib/promise/call_push_pull.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/poll.h
src:
- test/core/promise/call_push_pull_test.cc
deps:
- absl/status:status
- absl/status:statusor
- absl/types:variant
uses_polling: false
- name: cancel_ares_query_test
gtest: true
build: test

@ -0,0 +1,144 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_PROMISE_CALL_PUSH_PULL_H
#define GRPC_CORE_LIB_PROMISE_CALL_PUSH_PULL_H
#include <grpc/support/port_platform.h>
#include <assert.h>
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/promise/detail/promise_like.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
namespace promise_detail {
template <typename FMain, typename FPush, typename FPull>
class CallPushPull {
public:
CallPushPull(FMain f_main, FPush f_push, FPull f_pull)
: push_(std::move(f_push)), pull_(std::move(f_pull)) {
Construct(&main_, std::move(f_main));
}
CallPushPull(const CallPushPull&) = delete;
CallPushPull& operator=(const CallPushPull&) = delete;
CallPushPull(CallPushPull&& other) noexcept
: done_(other.done_),
push_(std::move(other.push_)),
pull_(std::move(other.pull_)) {
assert(!done_.is_set(kDoneMain));
Construct(&main_, std::move(other.main_));
}
CallPushPull& operator=(CallPushPull&& other) noexcept {
assert(!done_.is_set(kDoneMain));
done_ = other.done_;
assert(!done_.is_set(kDoneMain));
push_ = std::move(other.push_);
main_ = std::move(other.main_);
pull_ = std::move(other.pull_);
return *this;
}
~CallPushPull() {
if (done_.is_set(kDoneMain)) {
Destruct(&result_);
} else {
Destruct(&main_);
}
}
using Result =
typename PollTraits<decltype(std::declval<PromiseLike<FMain>>()())>::Type;
Poll<Result> operator()() {
if (!done_.is_set(kDonePush)) {
auto p = push_();
if (auto* status = absl::get_if<kPollReadyIdx>(&p)) {
if (IsStatusOk(*status)) {
done_.set(kDonePush);
} else {
return std::move(*status);
}
}
}
if (!done_.is_set(kDoneMain)) {
auto p = main_();
if (auto* status = absl::get_if<kPollReadyIdx>(&p)) {
if (IsStatusOk(*status)) {
done_.set(kDoneMain);
Destruct(&main_);
Construct(&result_, std::move(*status));
} else {
return std::move(*status);
}
}
}
if (!done_.is_set(kDonePull)) {
auto p = pull_();
if (auto* status = absl::get_if<kPollReadyIdx>(&p)) {
if (IsStatusOk(*status)) {
done_.set(kDonePull);
} else {
return std::move(*status);
}
}
}
if (done_.all()) return std::move(result_);
return Pending{};
}
private:
enum { kDonePull = 0, kDoneMain = 1, kDonePush = 2 };
BitSet<3> done_;
GPR_NO_UNIQUE_ADDRESS PromiseLike<FPush> push_;
union {
PromiseLike<FMain> main_;
Result result_;
};
GPR_NO_UNIQUE_ADDRESS PromiseLike<FPull> pull_;
};
} // namespace promise_detail
// For promises representing calls a common pattern emerges:
// There's a process pushing data down the stack, a process handling the main
// call part, and a process pulling data back up the stack.
//
// This can reasonably be represented by the right combinations of TryJoins and
// Maps, but since the structure is fundamental to the domain we introduce
// this simple helper to make it easier to write the common case.
//
// It takes three promises: the main call, the push and the pull.
// When polling, the push is polled first, then the main call (descending the
// stack), then the pull (as we ascend once more).
//
// This strategy minimizes repolls.
template <typename FMain, typename FPush, typename FPull>
promise_detail::CallPushPull<FMain, FPush, FPull> CallPushPull(FMain f_main,
FPush f_push,
FPull f_pull) {
return promise_detail::CallPushPull<FMain, FPush, FPull>(
std::move(f_main), std::move(f_push), std::move(f_pull));
}
} // namespace grpc_core
#endif // GRPC_CORE_LIB_PROMISE_CALL_PUSH_PULL_H

@ -359,3 +359,18 @@ grpc_cc_test(
"//test/core/util:grpc_suppressions",
],
)
grpc_cc_test(
name = "call_push_pull_test",
srcs = ["call_push_pull_test.cc"],
external_deps = [
"gtest",
"absl/status",
],
language = "c++",
uses_polling = False,
deps = [
"//:call_push_pull",
"//test/core/util:grpc_suppressions",
],
)

@ -0,0 +1,76 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/promise/call_push_pull.h"
#include <gtest/gtest.h>
#include "absl/status/status.h"
namespace grpc_core {
TEST(CallPushPullTest, Empty) {
auto p = CallPushPull([] { return absl::OkStatus(); },
[] { return absl::OkStatus(); },
[] { return absl::OkStatus(); });
EXPECT_EQ(p(), Poll<absl::Status>(absl::OkStatus()));
}
TEST(CallPushPullTest, Paused) {
auto p = CallPushPull([]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; });
EXPECT_EQ(p(), Poll<absl::Status>(Pending{}));
}
TEST(CallPushPullTest, OneReady) {
auto a = CallPushPull([]() -> Poll<absl::Status> { return absl::OkStatus(); },
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; });
EXPECT_EQ(a(), Poll<absl::Status>(Pending{}));
auto b = CallPushPull([]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return absl::OkStatus(); },
[]() -> Poll<absl::Status> { return Pending{}; });
EXPECT_EQ(b(), Poll<absl::Status>(Pending{}));
auto c =
CallPushPull([]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return absl::OkStatus(); });
EXPECT_EQ(c(), Poll<absl::Status>(Pending{}));
}
TEST(CallPushPullTest, OneFailed) {
auto a = CallPushPull(
[]() -> Poll<absl::Status> { return absl::UnknownError("bah"); },
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; });
EXPECT_EQ(a(), Poll<absl::Status>(absl::UnknownError("bah")));
auto b = CallPushPull(
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return absl::UnknownError("humbug"); },
[]() -> Poll<absl::Status> { return Pending{}; });
EXPECT_EQ(b(), Poll<absl::Status>(absl::UnknownError("humbug")));
auto c = CallPushPull(
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return Pending{}; },
[]() -> Poll<absl::Status> { return absl::UnknownError("wha"); });
EXPECT_EQ(c(), Poll<absl::Status>(absl::UnknownError("wha")));
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -3159,6 +3159,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "call_push_pull_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save