From 2caa79feeb53c982e7afad1a4f872cdbf166aefd Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 11 Jan 2023 15:11:17 -0800 Subject: [PATCH] [promises] Multi-producer, single-consumer for inter-activity comms (#31927) * mpsc * Automated change: Fix sanity tests * tests * tests * Automated change: Fix sanity tests * comments * comments * fix * Automated change: Fix sanity tests Co-authored-by: ctiller --- CMakeLists.txt | 44 ++++++ build_autogenerated.yaml | 30 ++++ src/core/BUILD | 17 +++ src/core/lib/promise/mpsc.h | 197 +++++++++++++++++++++++++++ test/core/promise/BUILD | 19 +++ test/core/promise/mpsc_test.cc | 156 +++++++++++++++++++++ tools/run_tests/generated/tests.json | 24 ++++ 7 files changed, 487 insertions(+) create mode 100644 src/core/lib/promise/mpsc.h create mode 100644 test/core/promise/mpsc_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index d947df615d4..c0d673c3fcd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1058,6 +1058,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx miscompile_with_no_unique_address_test) add_dependencies(buildtests_cxx mock_stream_test) add_dependencies(buildtests_cxx mock_test) + add_dependencies(buildtests_cxx mpsc_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx mpscq_test) endif() @@ -15112,6 +15113,49 @@ target_link_libraries(mock_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(mpsc_test + src/core/lib/promise/activity.cc + test/core/promise/mpsc_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(mpsc_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(mpsc_test + ${_gRPC_BASELIB_LIBRARIES} + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ZLIB_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + absl::flat_hash_set + absl::hash + absl::type_traits + absl::statusor + absl::utility + gpr +) + + endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 8ce0d179b51..95061f2e9c0 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -9262,6 +9262,36 @@ targets: deps: - grpc++_test - grpc++_test_util +- name: mpsc_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/debug_location.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h + - src/core/lib/promise/activity.h + - src/core/lib/promise/context.h + - src/core/lib/promise/detail/promise_factory.h + - src/core/lib/promise/detail/promise_like.h + - src/core/lib/promise/detail/status.h + - src/core/lib/promise/mpsc.h + - src/core/lib/promise/poll.h + - src/core/lib/promise/promise.h + - src/core/lib/promise/wait_set.h + src: + - src/core/lib/promise/activity.cc + - test/core/promise/mpsc_test.cc + deps: + - absl/container:flat_hash_set + - absl/hash:hash + - absl/meta:type_traits + - absl/status:statusor + - absl/utility:utility + - gpr + uses_polling: false - name: mpscq_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index 012e302a6d6..5492d45fab4 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -832,6 +832,23 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "mpsc", + hdrs = [ + "lib/promise/mpsc.h", + ], + external_deps = ["absl/base:core_headers"], + language = "c++", + deps = [ + "activity", + "poll", + "ref_counted", + "wait_set", + "//:gpr", + "//:ref_counted_ptr", + ], +) + grpc_cc_library( name = "for_each", external_deps = [ diff --git a/src/core/lib/promise/mpsc.h b/src/core/lib/promise/mpsc.h new file mode 100644 index 00000000000..0959521b8dd --- /dev/null +++ b/src/core/lib/promise/mpsc.h @@ -0,0 +1,197 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_PROMISE_MPSC_H +#define GRPC_CORE_LIB_PROMISE_MPSC_H + +#include + +#include + +#include +#include +#include + +#include "absl/base/thread_annotations.h" + +#include + +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/poll.h" +#include "src/core/lib/promise/wait_set.h" + +// Multi producer single consumer inter-activity comms. + +namespace grpc_core { + +namespace mpscpipe_detail { + +// "Center" of the communication pipe. +// Contains sent but not received messages, and open/close state. +template +class Center : public RefCounted> { + public: + // Construct the center with a maximum queue size. + explicit Center(size_t max_queued) : max_queued_(max_queued) {} + + // Poll for new items. + // - Returns true if new items were obtained, in which case they are contained + // in dest in the order they were added. Wakes up all pending senders since + // there will now be space to send. + // - If no new items are available, returns + // false and sets up a waker to be awoken when more items are available. + // TODO(ctiller): consider the problem of thundering herds here. There may be + // more senders than there are queue spots, and so the strategy of waking up + // all senders is ill-advised. + // That said, some senders may have been cancelled by the time we wake them, + // and so waking a subset could cause starvation. + bool PollReceiveBatch(std::vector& dest) { + ReleasableMutexLock lock(&mu_); + if (queue_.empty()) { + receive_waker_ = Activity::current()->MakeNonOwningWaker(); + return false; + } + dest.swap(queue_); + queue_.clear(); + auto wakeups = send_wakers_.TakeWakeupSet(); + lock.Release(); + wakeups.Wakeup(); + return true; + } + + // Poll to send one item. + // Returns pending if no send slot was available. + // Returns true if the item was sent. + // Returns false if the receiver has been closed. + Poll PollSend(T& t) { + ReleasableMutexLock lock(&mu_); + if (receiver_closed_) return Poll(false); + if (queue_.size() < max_queued_) { + queue_.push_back(std::move(t)); + auto receive_waker = std::move(receive_waker_); + lock.Release(); + receive_waker.Wakeup(); + return Poll(true); + } + send_wakers_.AddPending(Activity::current()->MakeNonOwningWaker()); + return Pending{}; + } + + // Mark that the receiver is closed. + void ReceiverClosed() { + MutexLock lock(&mu_); + receiver_closed_ = true; + } + + private: + Mutex mu_; + const size_t max_queued_; + std::vector queue_ ABSL_GUARDED_BY(mu_); + bool receiver_closed_ ABSL_GUARDED_BY(mu_) = false; + Waker receive_waker_ ABSL_GUARDED_BY(mu_); + WaitSet send_wakers_ ABSL_GUARDED_BY(mu_); +}; + +} // namespace mpscpipe_detail + +template +class MpscReceiver; + +// Send half of an mpsc pipe. +template +class MpscSender { + public: + MpscSender(const MpscSender&) = delete; + MpscSender& operator=(const MpscSender&) = delete; + MpscSender(MpscSender&&) noexcept = default; + MpscSender& operator=(MpscSender&&) noexcept = default; + + // Return a promise that will send one item. + // Resolves to true if sent, false if the receiver was closed (and the value + // will never be successfully sent). + auto Send(T t) { + return [this, t = std::move(t)]() mutable { return center_->PollSend(t); }; + } + + private: + friend class MpscReceiver; + explicit MpscSender(RefCountedPtr> center) + : center_(std::move(center)) {} + RefCountedPtr> center_; +}; + +// Receive half of an mpsc pipe. +template +class MpscReceiver { + public: + // max_buffer_hint is the maximum number of elements we'd like to buffer. + // We half this before passing to Center so that the number there is the + // maximum number of elements that can be queued in the center of the pipe. + // The receiver also holds some of the buffered elements (up to half of them!) + // so the total outstanding is equal to max_buffer_hint (unless it's 1 in + // which case instantaneosly we may have two elements buffered). + explicit MpscReceiver(size_t max_buffer_hint) + : center_(MakeRefCounted>( + std::max(static_cast(1), max_buffer_hint / 2))) {} + ~MpscReceiver() { + if (center_ != nullptr) center_->ReceiverClosed(); + } + MpscReceiver(const MpscReceiver&) = delete; + MpscReceiver& operator=(const MpscReceiver&) = delete; + // Only movable until it's first polled, and so we don't need to contend with + // a non-empty buffer during a legal move! + MpscReceiver(MpscReceiver&& other) noexcept + : center_(std::move(other.center_)) { + GPR_DEBUG_ASSERT(other.buffer_.empty()); + } + MpscReceiver& operator=(MpscReceiver&& other) noexcept { + GPR_DEBUG_ASSERT(other.buffer_.empty()); + center_ = std::move(other.center_); + return *this; + } + + // Construct a new sender for this receiver. + MpscSender MakeSender() { return MpscSender(center_); } + + // Return a promise that will resolve to the next item (and remove said item). + auto Next() { + return [this]() -> Poll { + if (buffer_it_ != buffer_.end()) { + return Poll(std::move(*buffer_it_++)); + } + if (center_->PollReceiveBatch(buffer_)) { + buffer_it_ = buffer_.begin(); + return Poll(std::move(*buffer_it_++)); + } + return Pending{}; + }; + } + + private: + // Received items. We move out of here one by one, but don't resize the + // vector. Instead, when we run out of items, we poll the center for more - + // which swaps this buffer in for the new receive queue and clears it. + // In this way, upon hitting a steady state the queue ought to be allocation + // free. + std::vector buffer_; + typename std::vector::iterator buffer_it_ = buffer_.end(); + RefCountedPtr> center_; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_LIB_PROMISE_MPSC_H diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD index 772ea391321..6546e89525c 100644 --- a/test/core/promise/BUILD +++ b/test/core/promise/BUILD @@ -392,6 +392,25 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "mpsc_test", + srcs = ["mpsc_test.cc"], + external_deps = [ + "absl/types:optional", + "gtest", + ], + language = "c++", + tags = ["promise_test"], + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:gpr", + "//:promise", + "//src/core:activity", + "//src/core:mpsc", + ], +) + grpc_proto_fuzzer( name = "promise_fuzzer", srcs = ["promise_fuzzer.cc"], diff --git a/test/core/promise/mpsc_test.cc b/test/core/promise/mpsc_test.cc new file mode 100644 index 00000000000..afd4fa6c78f --- /dev/null +++ b/test/core/promise/mpsc_test.cc @@ -0,0 +1,156 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/promise/mpsc.h" + +#include +#include + +#include "absl/types/optional.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include + +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/promise.h" + +using testing::Mock; +using testing::StrictMock; + +namespace grpc_core { +namespace { + +class MockActivity : public Activity, public Wakeable { + public: + MOCK_METHOD(void, WakeupRequested, ()); + + void ForceImmediateRepoll() override { WakeupRequested(); } + void Orphan() override {} + Waker MakeOwningWaker() override { return Waker(this); } + Waker MakeNonOwningWaker() override { return Waker(this); } + void Wakeup() override { WakeupRequested(); } + void Drop() override {} + std::string DebugTag() const override { return "MockActivity"; } + std::string ActivityDebugTag() const override { return DebugTag(); } + + void Activate() { + if (scoped_activity_ != nullptr) return; + scoped_activity_ = std::make_unique(this); + } + + void Deactivate() { scoped_activity_.reset(); } + + private: + std::unique_ptr scoped_activity_; +}; + +struct Payload { + std::unique_ptr x; + bool operator==(const Payload& other) const { + return (x == nullptr && other.x == nullptr) || + (x != nullptr && other.x != nullptr && *x == *other.x); + } +}; +Payload MakePayload(int value) { return {std::make_unique(value)}; } + +TEST(MpscTest, NoOp) { MpscReceiver receiver(1); } + +TEST(MpscTest, MakeSender) { + MpscReceiver receiver(1); + MpscSender sender = receiver.MakeSender(); +} + +TEST(MpscTest, SendOneThingInstantly) { + MpscReceiver receiver(1); + MpscSender sender = receiver.MakeSender(); + EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true); +} + +TEST(MpscTest, SendOneThingInstantlyAndReceiveInstantly) { + MpscReceiver receiver(1); + MpscSender sender = receiver.MakeSender(); + EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true); + EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(1)); +} + +TEST(MpscTest, SendingLotsOfThingsGivesPushback) { + StrictMock activity1; + MpscReceiver receiver(1); + MpscSender sender = receiver.MakeSender(); + + activity1.Activate(); + EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true); + EXPECT_EQ(NowOrNever(sender.Send(MakePayload(2))), absl::nullopt); + activity1.Deactivate(); +} + +TEST(MpscTest, ReceivingAfterBlockageWakesUp) { + StrictMock activity1; + StrictMock activity2; + MpscReceiver receiver(1); + MpscSender sender = receiver.MakeSender(); + + activity1.Activate(); + EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), true); + auto send2 = sender.Send(MakePayload(2)); + EXPECT_EQ(send2(), Poll(Pending{})); + activity1.Deactivate(); + + activity2.Activate(); + EXPECT_CALL(activity1, WakeupRequested()); + EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(1)); + Mock::VerifyAndClearExpectations(&activity1); + auto receive2 = receiver.Next(); + EXPECT_EQ(receive2(), Poll(Pending{})); + activity2.Deactivate(); + + activity1.Activate(); + EXPECT_CALL(activity2, WakeupRequested()); + EXPECT_EQ(send2(), Poll(true)); + Mock::VerifyAndClearExpectations(&activity2); + activity1.Deactivate(); + + activity2.Activate(); + EXPECT_EQ(receive2(), Poll(MakePayload(2))); + activity2.Deactivate(); +} + +TEST(MpscTest, BigBufferAllowsBurst) { + MpscReceiver receiver(50); + MpscSender sender = receiver.MakeSender(); + + for (int i = 0; i < 25; i++) { + EXPECT_EQ(NowOrNever(sender.Send(MakePayload(i))), true); + } + for (int i = 0; i < 25; i++) { + EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(i)); + } +} + +TEST(MpscTest, ClosureIsVisibleToSenders) { + auto receiver = std::make_unique>(1); + MpscSender sender = receiver->MakeSender(); + receiver.reset(); + EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), false); +} + +} // namespace +} // namespace grpc_core + +int main(int argc, char** argv) { + gpr_log_verbosity_init(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 6b1126714fd..555206ab17f 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4833,6 +4833,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "mpsc_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,