[promises] Inter-activity pipe (#34188)

Pipe-like type (has a send end, a receive end, and a closing mechanism)
for cross-activity transfers.

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/34211/head
Craig Tiller 2 years ago committed by GitHub
parent c405f75a5a
commit 60c6b6bb3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      CMakeLists.txt
  2. 30
      build_autogenerated.yaml
  3. 19
      src/core/BUILD
  4. 148
      src/core/lib/promise/inter_activity_pipe.h
  5. 19
      test/core/promise/BUILD
  6. 39
      test/core/promise/for_each_test.cc
  7. 113
      test/core/promise/inter_activity_pipe_test.cc
  8. 24
      tools/run_tests/generated/tests.json

37
CMakeLists.txt generated

@ -1065,6 +1065,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx init_test)
add_dependencies(buildtests_cxx initial_settings_frame_bad_client_test)
add_dependencies(buildtests_cxx insecure_security_connector_test)
add_dependencies(buildtests_cxx inter_activity_pipe_test)
add_dependencies(buildtests_cxx interceptor_list_test)
add_dependencies(buildtests_cxx interop_client)
add_dependencies(buildtests_cxx interop_server)
@ -14408,6 +14409,42 @@ target_link_libraries(insecure_security_connector_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(inter_activity_pipe_test
src/core/lib/promise/activity.cc
test/core/promise/inter_activity_pipe_test.cc
)
target_compile_features(inter_activity_pipe_test PUBLIC cxx_std_14)
target_include_directories(inter_activity_pipe_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(inter_activity_pipe_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::type_traits
absl::statusor
gpr
)
endif()
if(gRPC_BUILD_TESTS)

@ -8642,6 +8642,7 @@ targets:
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/if.h
- src/core/lib/promise/inter_activity_pipe.h
- src/core/lib/promise/interceptor_list.h
- src/core/lib/promise/join.h
- src/core/lib/promise/loop.h
@ -9979,6 +9980,35 @@ targets:
deps:
- gtest
- grpc_test_util
- name: inter_activity_pipe_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/seq_state.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/inter_activity_pipe.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/seq.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/promise/activity.cc
- test/core/promise/inter_activity_pipe_test.cc
deps:
- gtest
- absl/meta:type_traits
- absl/status:statusor
- gpr
uses_polling: false
- name: interceptor_list_test
gtest: true
build: test

@ -884,6 +884,25 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "inter_activity_pipe",
hdrs = [
"lib/promise/inter_activity_pipe.h",
],
external_deps = [
"absl/base:core_headers",
"absl/types:optional",
],
language = "c++",
deps = [
"activity",
"poll",
"ref_counted",
"//:gpr",
"//:ref_counted_ptr",
],
)
grpc_cc_library(
name = "promise_trace",
srcs = [

@ -0,0 +1,148 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H
#define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <array>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/types/optional.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
template <typename T, uint8_t kQueueSize>
class InterActivityPipe {
private:
class Center : public RefCounted<Center, NonPolymorphicRefCount> {
public:
Poll<bool> Push(T& value) {
ReleasableMutexLock lock(&mu_);
if (closed_) return false;
if (count_ == kQueueSize) {
on_available_ = Activity::current()->MakeNonOwningWaker();
return Pending{};
}
queue_[(first_ + count_) % kQueueSize] = std::move(value);
++count_;
if (count_ == 1) {
auto on_occupied = std::move(on_occupied_);
lock.Release();
on_occupied.Wakeup();
}
return true;
}
Poll<absl::optional<T>> Next() {
ReleasableMutexLock lock(&mu_);
if (count_ == 0) {
if (closed_) return absl::nullopt;
on_occupied_ = Activity::current()->MakeNonOwningWaker();
return Pending{};
}
auto value = std::move(queue_[first_]);
first_ = (first_ + 1) % kQueueSize;
--count_;
if (count_ == kQueueSize - 1) {
auto on_available = std::move(on_available_);
lock.Release();
on_available.Wakeup();
}
return std::move(value);
}
void MarkClosed() {
ReleasableMutexLock lock(&mu_);
if (std::exchange(closed_, true)) return;
auto on_occupied = std::move(on_occupied_);
auto on_available = std::move(on_available_);
lock.Release();
on_occupied.Wakeup();
on_available.Wakeup();
}
private:
Mutex mu_;
std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_);
bool closed_ ABSL_GUARDED_BY(mu_) = false;
uint8_t first_ ABSL_GUARDED_BY(mu_) = 0;
uint8_t count_ ABSL_GUARDED_BY(mu_) = 0;
Waker on_occupied_ ABSL_GUARDED_BY(mu_);
Waker on_available_ ABSL_GUARDED_BY(mu_);
};
RefCountedPtr<Center> center_{MakeRefCounted<Center>()};
public:
class Sender {
public:
explicit Sender(RefCountedPtr<Center> center)
: center_(std::move(center)) {}
Sender(const Sender&) = delete;
Sender& operator=(const Sender&) = delete;
Sender(Sender&&) noexcept = default;
Sender& operator=(Sender&&) noexcept = default;
~Sender() {
if (center_ != nullptr) center_->MarkClosed();
}
auto Push(T value) {
return [center = center_, value = std::move(value)]() mutable {
return center->Push(value);
};
}
private:
RefCountedPtr<Center> center_;
};
class Receiver {
public:
explicit Receiver(RefCountedPtr<Center> center)
: center_(std::move(center)) {}
Receiver(const Receiver&) = delete;
Receiver& operator=(const Receiver&) = delete;
Receiver(Receiver&&) noexcept = default;
Receiver& operator=(Receiver&&) noexcept = default;
~Receiver() {
if (center_ != nullptr) center_->MarkClosed();
}
auto Next() {
return [center = center_]() { return center->Next(); };
}
private:
RefCountedPtr<Center> center_;
};
Sender sender{center_};
Receiver receiver{center_};
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H

@ -336,6 +336,7 @@ grpc_cc_test(
"//src/core:arena",
"//src/core:event_engine_memory_allocator",
"//src/core:for_each",
"//src/core:inter_activity_pipe",
"//src/core:join",
"//src/core:map",
"//src/core:memory_quota",
@ -400,6 +401,24 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "inter_activity_pipe_test",
srcs = ["inter_activity_pipe_test.cc"],
external_deps = [
"absl/status",
"gtest",
],
language = "c++",
tags = ["promise_test"],
uses_event_engine = False,
uses_polling = False,
deps = [
"test_wakeup_schedulers",
"//src/core:inter_activity_pipe",
"//src/core:seq",
],
)
grpc_cc_test(
name = "mpsc_test",
srcs = ["mpsc_test.cc"],

@ -26,6 +26,7 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/inter_activity_pipe.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
@ -83,6 +84,44 @@ TEST_F(ForEachTest, SendThriceWithPipe) {
EXPECT_EQ(num_received, 3);
}
TEST_F(ForEachTest, SendThriceWithInterActivityPipe) {
int num_received = 0;
StrictMock<MockFunction<void(absl::Status)>> on_done_sender;
StrictMock<MockFunction<void(absl::Status)>> on_done_receiver;
EXPECT_CALL(on_done_sender, Call(absl::OkStatus()));
EXPECT_CALL(on_done_receiver, Call(absl::OkStatus()));
InterActivityPipe<int, 1> pipe;
auto send_activity = MakeActivity(
Seq(
// Push 3 things into a pipe -- 1, 2, then 3 -- then close.
pipe.sender.Push(1), [&pipe] { return pipe.sender.Push(2); },
[&pipe] { return pipe.sender.Push(3); },
[&pipe] {
auto x = std::move(pipe.sender);
return absl::OkStatus();
}),
InlineWakeupScheduler{}, [&on_done_sender](absl::Status status) {
on_done_sender.Call(std::move(status));
});
MakeActivity(
[&num_received, &pipe] {
// Use a ForEach loop to read them out and verify
// all values are seen.
return ForEach(std::move(pipe.receiver), [&num_received](int i) {
num_received++;
EXPECT_EQ(num_received, i);
return absl::OkStatus();
});
},
NoWakeupScheduler(),
[&on_done_receiver](absl::Status status) {
on_done_receiver.Call(std::move(status));
});
Mock::VerifyAndClearExpectations(&on_done_sender);
Mock::VerifyAndClearExpectations(&on_done_receiver);
EXPECT_EQ(num_received, 3);
}
// Pollable type that stays movable until it's polled, then causes the test to
// fail if it's moved again.
// Promises have the property that they can be moved until polled, and this

@ -0,0 +1,113 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/promise/inter_activity_pipe.h"
#include "absl/status/status.h"
#include "gtest/gtest.h"
#include "src/core/lib/promise/seq.h"
#include "test/core/promise/test_wakeup_schedulers.h"
namespace grpc_core {
template <typename F>
ActivityPtr TestActivity(F f) {
return MakeActivity(std::move(f), InlineWakeupScheduler{},
[](absl::Status status) { EXPECT_TRUE(status.ok()); });
}
TEST(InterActivityPipe, CanSendAndReceive) {
InterActivityPipe<int, 1> pipe;
bool done = false;
auto a = TestActivity(Seq(pipe.sender.Push(3), [](bool b) {
EXPECT_TRUE(b);
return absl::OkStatus();
}));
EXPECT_FALSE(done);
auto b =
TestActivity(Seq(pipe.receiver.Next(), [&done](absl::optional<int> n) {
EXPECT_EQ(n, 3);
done = true;
return absl::OkStatus();
}));
EXPECT_TRUE(done);
}
TEST(InterActivityPipe, CanSendTwiceAndReceive) {
InterActivityPipe<int, 1> pipe;
bool done = false;
auto a = TestActivity(Seq(
pipe.sender.Push(3),
[&](bool b) {
EXPECT_TRUE(b);
return pipe.sender.Push(4);
},
[](bool b) {
EXPECT_TRUE(b);
return absl::OkStatus();
}));
EXPECT_FALSE(done);
auto b = TestActivity(Seq(
pipe.receiver.Next(),
[&pipe](absl::optional<int> n) {
EXPECT_EQ(n, 3);
return pipe.receiver.Next();
},
[&done](absl::optional<int> n) {
EXPECT_EQ(n, 4);
done = true;
return absl::OkStatus();
}));
EXPECT_TRUE(done);
}
TEST(InterActivityPipe, CanReceiveAndSend) {
InterActivityPipe<int, 1> pipe;
bool done = false;
auto b =
TestActivity(Seq(pipe.receiver.Next(), [&done](absl::optional<int> n) {
EXPECT_EQ(n, 3);
done = true;
return absl::OkStatus();
}));
EXPECT_FALSE(done);
auto a = TestActivity(Seq(pipe.sender.Push(3), [](bool b) {
EXPECT_TRUE(b);
return absl::OkStatus();
}));
EXPECT_TRUE(done);
}
TEST(InterActivityPipe, CanClose) {
InterActivityPipe<int, 1> pipe;
bool done = false;
auto b =
TestActivity(Seq(pipe.receiver.Next(), [&done](absl::optional<int> n) {
EXPECT_EQ(n, absl::nullopt);
done = true;
return absl::OkStatus();
}));
EXPECT_FALSE(done);
// Drop the sender
{ auto x = std::move(pipe.sender); }
EXPECT_TRUE(done);
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -4821,6 +4821,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "inter_activity_pipe_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save