diff --git a/BUILD b/BUILD index 4f00a9d422a..a16ba60927b 100644 --- a/BUILD +++ b/BUILD @@ -2427,6 +2427,27 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "event_engine_work_queue", + srcs = [ + "src/core/lib/event_engine/work_queue.cc", + ], + hdrs = [ + "src/core/lib/event_engine/work_queue.h", + ], + external_deps = [ + "absl/base:core_headers", + "absl/functional:any_invocable", + "absl/types:optional", + ], + deps = [ + "common_event_engine_closures", + "event_engine_base_hdrs", + "gpr", + "time", + ], +) + grpc_cc_library( name = "event_engine_threaded_executor", srcs = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 292b41f7941..fe6c43678e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1242,6 +1242,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx window_overflow_bad_client_test) add_dependencies(buildtests_cxx wire_reader_test) add_dependencies(buildtests_cxx wire_writer_test) + add_dependencies(buildtests_cxx work_queue_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx work_serializer_test) endif() @@ -19740,6 +19741,42 @@ target_link_libraries(wire_writer_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(work_queue_test + src/core/lib/event_engine/work_queue.cc + test/core/event_engine/work_queue/work_queue_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(work_queue_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(work_queue_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util_unsecure +) + + endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 7bc9f86a31f..c5f464d2f42 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -10758,6 +10758,18 @@ targets: - absl/cleanup:cleanup - grpc_test_util uses_polling: false +- name: work_queue_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/event_engine/common_closures.h + - src/core/lib/event_engine/work_queue.h + src: + - src/core/lib/event_engine/work_queue.cc + - test/core/event_engine/work_queue/work_queue_test.cc + deps: + - grpc_test_util_unsecure - name: work_serializer_test gtest: true build: test diff --git a/src/core/lib/event_engine/work_queue.cc b/src/core/lib/event_engine/work_queue.cc new file mode 100644 index 00000000000..456716a4560 --- /dev/null +++ b/src/core/lib/event_engine/work_queue.cc @@ -0,0 +1,194 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "src/core/lib/event_engine/work_queue.h" + +#include +#include +#include + +#include "absl/functional/any_invocable.h" + +#include "src/core/lib/event_engine/common_closures.h" + +namespace grpc_event_engine { +namespace experimental { + +// ------ WorkQueue::Storage -------------------------------------------------- + +WorkQueue::Storage::Storage(EventEngine::Closure* closure) noexcept + : closure_(closure), + enqueued_( + grpc_core::Timestamp::Now().milliseconds_after_process_epoch()) {} + +WorkQueue::Storage::Storage(absl::AnyInvocable callback) noexcept + : closure_(SelfDeletingClosure::Create(std::move(callback))), + enqueued_( + grpc_core::Timestamp::Now().milliseconds_after_process_epoch()) {} + +WorkQueue::Storage::Storage(Storage&& other) noexcept + : closure_(other.closure_), enqueued_(other.enqueued_) {} + +WorkQueue::Storage& WorkQueue::Storage::operator=(Storage&& other) noexcept { + std::swap(closure_, other.closure_); + std::swap(enqueued_, other.enqueued_); + return *this; +} + +EventEngine::Closure* WorkQueue::Storage::closure() { return closure_; } + +// ------ WorkQueue ----------------------------------------------------------- + +// Returns whether the queue is empty +bool WorkQueue::Empty() const { + return (most_recent_element_enqueue_timestamp_.load( + std::memory_order_relaxed) == kInvalidTimestamp && + oldest_enqueued_timestamp_.load(std::memory_order_relaxed) == + kInvalidTimestamp); +} + +grpc_core::Timestamp WorkQueue::OldestEnqueuedTimestamp() const { + int64_t front_of_queue_timestamp = + oldest_enqueued_timestamp_.load(std::memory_order_relaxed); + if (front_of_queue_timestamp != kInvalidTimestamp) { + return grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( + front_of_queue_timestamp); + } + int64_t most_recent_millis = + most_recent_element_enqueue_timestamp_.load(std::memory_order_relaxed); + if (most_recent_millis == kInvalidTimestamp) { + return grpc_core::Timestamp::InfPast(); + } + return grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( + most_recent_millis); +} + +EventEngine::Closure* WorkQueue::PopFront() ABSL_LOCKS_EXCLUDED(mu_) { + if (oldest_enqueued_timestamp_.load(std::memory_order_relaxed) != + kInvalidTimestamp) { + EventEngine::Closure* t = TryLockAndPop(/*front=*/true); + if (t != nullptr) return t; + } + if (most_recent_element_enqueue_timestamp_.load(std::memory_order_relaxed) != + kInvalidTimestamp) { + return TryPopMostRecentElement(); + } + return nullptr; +} + +EventEngine::Closure* WorkQueue::PopBack() { + if (most_recent_element_enqueue_timestamp_.load(std::memory_order_relaxed) != + kInvalidTimestamp) { + return TryPopMostRecentElement(); + } + if (oldest_enqueued_timestamp_.load(std::memory_order_relaxed) != + kInvalidTimestamp) { + EventEngine::Closure* t = TryLockAndPop(/*front=*/false); + if (t != nullptr) return t; + } + return nullptr; +} + +void WorkQueue::Add(EventEngine::Closure* closure) { + AddInternal(Storage(closure)); +} + +void WorkQueue::Add(absl::AnyInvocable invocable) { + AddInternal(Storage(std::move(invocable))); +} + +void WorkQueue::AddInternal(Storage&& storage) { + Storage previous_most_recent; + int64_t previous_ts; + { + absl::optional tmp_element; + { + grpc_core::MutexLock lock(&most_recent_element_lock_); + previous_ts = most_recent_element_enqueue_timestamp_.exchange( + storage.enqueued(), std::memory_order_relaxed); + tmp_element = std::exchange(most_recent_element_, std::move(storage)); + } + if (!tmp_element.has_value() || previous_ts == kInvalidTimestamp) return; + previous_most_recent = std::move(*tmp_element); + } + grpc_core::MutexLock lock(&mu_); + if (elements_.empty()) { + oldest_enqueued_timestamp_.store(previous_ts, std::memory_order_relaxed); + } + elements_.push_back(std::move(previous_most_recent)); +} + +EventEngine::Closure* WorkQueue::TryLockAndPop(bool front) + ABSL_LOCKS_EXCLUDED(mu_) { + // Do not block the worker if there are other workers trying to pop + // tasks from this queue. + if (!mu_.TryLock()) return nullptr; + auto ret = PopLocked(front); + mu_.Unlock(); + return ret; +} + +EventEngine::Closure* WorkQueue::PopLocked(bool front) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + if (GPR_UNLIKELY(elements_.empty())) { + if (most_recent_element_enqueue_timestamp_.load( + std::memory_order_relaxed) == kInvalidTimestamp) { + return nullptr; + } + if (!most_recent_element_lock_.TryLock()) return nullptr; + absl::optional ret; + if (GPR_LIKELY(most_recent_element_.has_value())) { + most_recent_element_enqueue_timestamp_.store(kInvalidTimestamp, + std::memory_order_relaxed); + ret = std::exchange(most_recent_element_, absl::nullopt); + } + most_recent_element_lock_.Unlock(); + return ret->closure(); + } + // the queue has elements, let's pop one and update timestamps + Storage ret_s; + if (front) { + ret_s = std::move(elements_.front()); + elements_.pop_front(); + } else { + ret_s = std::move(elements_.back()); + elements_.pop_back(); + } + if (elements_.empty()) { + oldest_enqueued_timestamp_.store(kInvalidTimestamp, + std::memory_order_relaxed); + } else if (front) { + oldest_enqueued_timestamp_.store(elements_.front().enqueued(), + std::memory_order_relaxed); + } + return ret_s.closure(); +} + +EventEngine::Closure* WorkQueue::TryPopMostRecentElement() { + if (!most_recent_element_lock_.TryLock()) return nullptr; + if (GPR_UNLIKELY(!most_recent_element_.has_value())) { + most_recent_element_lock_.Unlock(); + return nullptr; + } + most_recent_element_enqueue_timestamp_.store(kInvalidTimestamp, + std::memory_order_relaxed); + absl::optional tmp = + std::exchange(most_recent_element_, absl::nullopt); + most_recent_element_lock_.Unlock(); + return tmp->closure(); +} + +} // namespace experimental +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/work_queue.h b/src/core/lib/event_engine/work_queue.h new file mode 100644 index 00000000000..86e3a041c2f --- /dev/null +++ b/src/core/lib/event_engine/work_queue.h @@ -0,0 +1,121 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_H +#define GRPC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_H + +#include + +#include + +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/functional/any_invocable.h" +#include "absl/types/optional.h" + +#include + +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/time.h" + +namespace grpc_event_engine { +namespace experimental { + +// A fast work queue based lightly on an internal Google implementation. +// +// This uses atomics to access the most recent element in the queue, making it +// fast for LIFO operations. Accessing the oldest (next) element requires taking +// a mutex lock. +class WorkQueue { + public: + // comparable to Timestamp::milliseconds_after_process_epoch() + static const int64_t kInvalidTimestamp = -1; + + WorkQueue() = default; + // Returns whether the queue is empty + bool Empty() const; + // Returns the Timestamp of when the most recently-added element was + // enqueued. + grpc_core::Timestamp OldestEnqueuedTimestamp() const; + // Returns the next (oldest) element from the queue, or nullopt if empty + EventEngine::Closure* PopFront() ABSL_LOCKS_EXCLUDED(mu_); + // Returns the most recent element from the queue, or nullopt if empty + EventEngine::Closure* PopBack(); + // Adds a closure to the back of the queue + void Add(EventEngine::Closure* closure); + // Wraps an AnyInvocable and adds it to the back of the queue + void Add(absl::AnyInvocable invocable); + + private: + class Storage { + public: + Storage() = default; + // Take a non-owned Closure* + // Requires an exec_ctx on the stack + // TODO(ctiller): replace with an alternative time source + explicit Storage(EventEngine::Closure* closure) noexcept; + // Wrap an AnyInvocable into a Closure. + // The closure must be executed or explicitly deleted to prevent memory + // leaks. Requires an exec_ctx on the stack + // TODO(ctiller): replace with an alternative time source + explicit Storage(absl::AnyInvocable callback) noexcept; + ~Storage() = default; + // not copyable + Storage(const Storage&) = delete; + Storage& operator=(const Storage&) = delete; + // moveable + Storage(Storage&& other) noexcept; + Storage& operator=(Storage&& other) noexcept; + // Is this enqueued? + int64_t enqueued() const { return enqueued_; } + // Get the stored closure, or wrapped AnyInvocable + EventEngine::Closure* closure(); + + private: + EventEngine::Closure* closure_ = nullptr; + int64_t enqueued_ = kInvalidTimestamp; + }; + + // Attempts to pop from the front of the queue (oldest). + // This will return nullopt if the queue is empty, or if other workers + // are already attempting to pop from this queue. + EventEngine::Closure* TryLockAndPop(bool front) ABSL_LOCKS_EXCLUDED(mu_); + // Internal implementation, helps with thread safety analysis in TryLockAndPop + EventEngine::Closure* PopLocked(bool front) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + // Attempts to pop from the back of the queue (most recent). + // This will return nullopt if the queue is empty, or if other workers + // are already attempting to pop from this queue. + EventEngine::Closure* TryPopMostRecentElement(); + // Common code for the Add methods + void AddInternal(Storage&& storage); + + // The managed items in the queue + std::deque elements_ ABSL_GUARDED_BY(mu_); + // The most recently enqueued element. This is reserved from work stealing + absl::optional most_recent_element_ + ABSL_GUARDED_BY(most_recent_element_lock_); + grpc_core::Mutex ABSL_ACQUIRED_AFTER(mu_) most_recent_element_lock_; + // TODO(hork): consider ABSL_CACHELINE_ALIGNED + std::atomic most_recent_element_enqueue_timestamp_{ + kInvalidTimestamp}; + std::atomic oldest_enqueued_timestamp_{kInvalidTimestamp}; + grpc_core::Mutex mu_; +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_H diff --git a/test/core/event_engine/work_queue/BUILD b/test/core/event_engine/work_queue/BUILD new file mode 100644 index 00000000000..60636097a24 --- /dev/null +++ b/test/core/event_engine/work_queue/BUILD @@ -0,0 +1,51 @@ +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") +load("//test/core/util:grpc_fuzzer.bzl", "grpc_proto_fuzzer") + +licenses(["notice"]) + +grpc_package( + name = "test/core/event_engine/work_queue", + visibility = "tests", +) + +grpc_cc_test( + name = "work_queue_test", + srcs = ["work_queue_test.cc"], + external_deps = ["gtest"], + deps = [ + "//:common_event_engine_closures", + "//:event_engine_work_queue", + "//:exec_ctx", + "//:gpr_platform", + "//test/core/util:grpc_test_util_unsecure", + ], +) + +grpc_proto_fuzzer( + name = "work_queue_fuzzer", + srcs = ["work_queue_fuzzer.cc"], + corpus = "corpora", + language = "C++", + proto = "work_queue_fuzzer.proto", + tags = ["no_windows"], + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:event_engine_work_queue", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/event_engine/work_queue/corpora/empty b/test/core/event_engine/work_queue/corpora/empty new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/core/event_engine/work_queue/work_queue_fuzzer.cc b/test/core/event_engine/work_queue/work_queue_fuzzer.cc new file mode 100644 index 00000000000..544ccbbb367 --- /dev/null +++ b/test/core/event_engine/work_queue/work_queue_fuzzer.cc @@ -0,0 +1,137 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include + +#include "absl/container/flat_hash_map.h" + +#include "src/core/lib/event_engine/common_closures.h" +#include "src/core/lib/event_engine/work_queue.h" +#include "src/libfuzzer/libfuzzer_macro.h" +#include "test/core/event_engine/work_queue/work_queue_fuzzer.pb.h" + +bool squelch = true; +bool leak_check = true; + +namespace grpc_event_engine { +namespace experimental { + +class WorkQueueFuzzer { + public: + WorkQueueFuzzer() { CheckEqual(); }; + ~WorkQueueFuzzer() { CheckEqual(); }; + + void Run(const work_queue_fuzzer::Action& action) { + switch (action.action_type_case()) { + case work_queue_fuzzer::Action::kAdd: { + if (action.add().type() == work_queue_fuzzer::CALLBACK_TYPE_CLOSURE) { + work_queue_.Add(CreateClosure(action.add().key())); + deque_.push_back(CreateClosure(action.add().key())); + } else { + work_queue_.Add(CreateInvocable(action.add().key())); + deque_.push_back(CreateClosureWrappedInvocable(action.add().key())); + } + } break; + case work_queue_fuzzer::Action::kPopFront: { + // pop front closures, executing both to check they are a pair + auto* wq_c = work_queue_.PopFront(); + if (wq_c == nullptr) { + if (!work_queue_.Empty() || !deque_.empty()) abort(); + } else { + auto* dq_c = deque_.front(); + deque_.pop_front(); + wq_c->Run(); + dq_c->Run(); + } + } break; + case work_queue_fuzzer::Action::kPopBack: { + // pop back closures, executing both to check they are a pair + auto* wq_c = work_queue_.PopBack(); + if (wq_c == nullptr) { + if (!work_queue_.Empty() || !deque_.empty()) abort(); + } else { + auto* dq_c = deque_.back(); + deque_.pop_back(); + wq_c->Run(); + dq_c->Run(); + } + } break; + case work_queue_fuzzer::Action::kEmpty: { + if (work_queue_.Empty() != deque_.empty()) abort(); + } break; + case work_queue_fuzzer::Action::ACTION_TYPE_NOT_SET: + break; + }; + } + + private: + EventEngine::Closure* CreateClosure(int key) { + return SelfDeletingClosure::Create([key, this] { + if (last_executed_key_.has_value()) { + if (*last_executed_key_ != key) abort(); + last_executed_key_.reset(); + } else { + last_executed_key_ = key; + } + }); + } + + absl::AnyInvocable CreateInvocable(int key) { + return absl::AnyInvocable([key, this] { + if (last_executed_key_.has_value()) { + if (*last_executed_key_ != key) abort(); + last_executed_key_.reset(); + } else { + last_executed_key_ = key; + } + }); + } + + EventEngine::Closure* CreateClosureWrappedInvocable(int key) { + auto invocable = CreateInvocable(key); + return SelfDeletingClosure::Create( + [invocable = std::move(invocable)]() mutable { invocable(); }); + } + + void CheckEqual() { + while (auto* wq_c = work_queue_.PopBack()) { + if (deque_.empty()) abort(); + auto* dq_c = deque_.back(); + deque_.pop_back(); + wq_c->Run(); + dq_c->Run(); + } + } + + WorkQueue work_queue_; + std::deque deque_; + // Closures are always added in pairs and checked in paris. + // When checking, each popped closure encounters one of these situations: + // A) it is the first of a pair, denoted by an empty last_executed_key_, so + // it sets last_executed_key_ to its own key. + // B) last_executed_key_ is set, so its value must match this closure's own + // key to assert that it is the other part of the pair. last_executed_key_ + // is then reset. + absl::optional last_executed_key_; +}; + +} // namespace experimental +} // namespace grpc_event_engine + +DEFINE_PROTO_FUZZER(const work_queue_fuzzer::Msg& msg) { + for (const auto& action : msg.actions()) { + grpc_event_engine::experimental::WorkQueueFuzzer().Run(action); + } +} diff --git a/test/core/event_engine/work_queue/work_queue_fuzzer.proto b/test/core/event_engine/work_queue/work_queue_fuzzer.proto new file mode 100644 index 00000000000..0b906508ac5 --- /dev/null +++ b/test/core/event_engine/work_queue/work_queue_fuzzer.proto @@ -0,0 +1,48 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +package work_queue_fuzzer; + +enum CallbackType { + CALLBACK_TYPE_CLOSURE = 0; + CALLBACK_TYPE_ANY_INVOCABLE = 1; +} + +message Add { + CallbackType type = 1; + int32 key = 2; +} + +message PopFront { +} + +message PopBack { +} + +message Empty { +} + +message Action { + oneof action_type { + Add add = 1; + PopFront pop_front = 2; + PopBack pop_back = 3; + Empty empty = 4; + } +} + +message Msg { + repeated Action actions = 1; +} diff --git a/test/core/event_engine/work_queue/work_queue_test.cc b/test/core/event_engine/work_queue/work_queue_test.cc new file mode 100644 index 00000000000..2eb1463b1ad --- /dev/null +++ b/test/core/event_engine/work_queue/work_queue_test.cc @@ -0,0 +1,170 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "src/core/lib/event_engine/work_queue.h" + +#include + +#include + +#include "src/core/lib/event_engine/common_closures.h" +#include "test/core/util/test_config.h" + +namespace { +using ::grpc_event_engine::experimental::AnyInvocableClosure; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::WorkQueue; + +TEST(WorkQueueTest, StartsEmpty) { + WorkQueue queue; + ASSERT_TRUE(queue.Empty()); +} + +TEST(WorkQueueTest, TakesClosures) { + WorkQueue queue; + bool ran = false; + AnyInvocableClosure closure([&ran] { ran = true; }); + queue.Add(&closure); + ASSERT_FALSE(queue.Empty()); + EventEngine::Closure* popped = queue.PopFront(); + ASSERT_NE(popped, nullptr); + popped->Run(); + ASSERT_TRUE(ran); + ASSERT_TRUE(queue.Empty()); +} + +TEST(WorkQueueTest, TakesAnyInvocables) { + WorkQueue queue; + bool ran = false; + queue.Add([&ran] { ran = true; }); + ASSERT_FALSE(queue.Empty()); + EventEngine::Closure* popped = queue.PopFront(); + ASSERT_NE(popped, nullptr); + popped->Run(); + ASSERT_TRUE(ran); + ASSERT_TRUE(queue.Empty()); +} + +TEST(WorkQueueTest, BecomesEmptyOnPopBack) { + WorkQueue queue; + bool ran = false; + queue.Add([&ran] { ran = true; }); + ASSERT_FALSE(queue.Empty()); + EventEngine::Closure* closure = queue.PopBack(); + ASSERT_NE(closure, nullptr); + closure->Run(); + ASSERT_TRUE(ran); + ASSERT_TRUE(queue.Empty()); +} + +TEST(WorkQueueTest, PopFrontIsFIFO) { + WorkQueue queue; + int flag = 0; + queue.Add([&flag] { flag |= 1; }); + queue.Add([&flag] { flag |= 2; }); + queue.PopFront()->Run(); + EXPECT_TRUE(flag & 1); + EXPECT_FALSE(flag & 2); + queue.PopFront()->Run(); + EXPECT_TRUE(flag & 1); + EXPECT_TRUE(flag & 2); + ASSERT_TRUE(queue.Empty()); +} + +TEST(WorkQueueTest, PopBackIsLIFO) { + WorkQueue queue; + int flag = 0; + queue.Add([&flag] { flag |= 1; }); + queue.Add([&flag] { flag |= 2; }); + queue.PopBack()->Run(); + EXPECT_FALSE(flag & 1); + EXPECT_TRUE(flag & 2); + queue.PopBack()->Run(); + EXPECT_TRUE(flag & 1); + EXPECT_TRUE(flag & 2); + ASSERT_TRUE(queue.Empty()); +} + +TEST(WorkQueueTest, OldestEnqueuedTimestampIsSane) { + WorkQueue queue; + ASSERT_EQ(queue.OldestEnqueuedTimestamp(), grpc_core::Timestamp::InfPast()); + queue.Add([] {}); + ASSERT_LE(queue.OldestEnqueuedTimestamp(), grpc_core::Timestamp::Now()); + auto* popped = queue.PopFront(); + ASSERT_EQ(queue.OldestEnqueuedTimestamp(), grpc_core::Timestamp::InfPast()); + // prevent leaks by executing or deleting the closure + delete popped; +} + +TEST(WorkQueueTest, OldestEnqueuedTimestampOrderingIsCorrect) { + WorkQueue queue; + AnyInvocableClosure closure([] {}); + queue.Add(&closure); + absl::SleepFor(absl::Milliseconds(2)); + queue.Add(&closure); + absl::SleepFor(absl::Milliseconds(2)); + queue.Add(&closure); + absl::SleepFor(absl::Milliseconds(2)); + auto oldest_ts = queue.OldestEnqueuedTimestamp(); + ASSERT_LE(oldest_ts, grpc_core::Timestamp::Now()); + // pop the oldest, and ensure the next oldest is younger + EventEngine::Closure* popped = queue.PopFront(); + ASSERT_NE(popped, nullptr); + auto second_oldest_ts = queue.OldestEnqueuedTimestamp(); + ASSERT_GT(second_oldest_ts, oldest_ts); + // pop the oldest, and ensure the last one is youngest + popped = queue.PopFront(); + ASSERT_NE(popped, nullptr); + auto youngest_ts = queue.OldestEnqueuedTimestamp(); + ASSERT_GT(youngest_ts, second_oldest_ts); + ASSERT_GT(youngest_ts, oldest_ts); +} + +TEST(WorkQueueTest, ThreadedStress) { + WorkQueue queue; + constexpr int thd_count = 33; + constexpr int element_count_per_thd = 3333; + std::vector threads; + threads.reserve(thd_count); + class TestClosure : public EventEngine::Closure { + public: + void Run() override { delete this; } + }; + for (int i = 0; i < thd_count; i++) { + threads.emplace_back([&] { + for (int j = 0; j < element_count_per_thd; j++) { + queue.Add(new TestClosure()); + } + int run_count = 0; + while (run_count < element_count_per_thd) { + if (auto* c = queue.PopFront()) { + c->Run(); + ++run_count; + } + } + }); + } + for (auto& thd : threads) thd.join(); + EXPECT_TRUE(queue.Empty()); +} + +} // namespace + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + auto result = RUN_ALL_TESTS(); + return result; +} diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index eb53a8877d8..13e42712a88 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -389,3 +389,23 @@ grpc_cc_test( ], deps = [":callback_streaming_ping_pong_h"], ) + +grpc_cc_test( + name = "bm_work_queue", + srcs = ["bm_work_queue.cc"], + args = grpc_benchmark_args(), + external_deps = ["benchmark"], + tags = [ + "manual", + "no_windows", + "notap", + ], + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:common_event_engine_closures", + "//:event_engine_work_queue", + "//:gpr", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/cpp/microbenchmarks/bm_work_queue.cc b/test/cpp/microbenchmarks/bm_work_queue.cc new file mode 100644 index 00000000000..d0ed7fb4123 --- /dev/null +++ b/test/cpp/microbenchmarks/bm_work_queue.cc @@ -0,0 +1,313 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include +#include +#include + +// ensure assert() is enabled +#undef NDEBUG +#include + +#include + +#include + +#include "src/core/lib/event_engine/common_closures.h" +#include "src/core/lib/event_engine/work_queue.h" +#include "test/core/util/test_config.h" + +namespace { + +using ::grpc_event_engine::experimental::AnyInvocableClosure; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::WorkQueue; + +grpc_core::Mutex globalMu; +std::vector* globalWorkQueueList; +std::vector*>* globalDequeList; +std::vector* globalDequeMutexList; + +void GlobalSetup(const benchmark::State& state) { + // called for every test, resets all state + globalWorkQueueList = new std::vector(); + globalWorkQueueList->reserve(state.threads()); + globalDequeList = new std::vector*>(); + globalDequeList->reserve(state.threads()); + globalDequeMutexList = new std::vector( + std::vector(state.threads())); +} + +void GlobalTeardown(const benchmark::State& /* state */) { + // called for every test, resets all state + delete globalWorkQueueList; + delete globalDequeList; + delete globalDequeMutexList; +} + +void BM_WorkQueueIntptrPopFront(benchmark::State& state) { + WorkQueue queue; + grpc_event_engine::experimental::AnyInvocableClosure closure([] {}); + int element_count = state.range(0); + for (auto _ : state) { + int cnt = 0; + for (int i = 0; i < element_count; i++) queue.Add(&closure); + absl::optional popped; + cnt = 0; + do { + popped = queue.PopFront(); + if (popped.has_value()) ++cnt; + } while (cnt < element_count); + } + state.counters["Added"] = element_count * state.iterations(); + state.counters["Popped"] = state.counters["Added"]; + state.counters["Steal Rate"] = + benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); +} +BENCHMARK(BM_WorkQueueIntptrPopFront) + ->Setup(GlobalSetup) + ->Teardown(GlobalTeardown) + ->Range(1, 512) + ->UseRealTime() + ->MeasureProcessCPUTime(); + +void BM_MultithreadedWorkQueuePopBack(benchmark::State& state) { + if (state.thread_index() == 0) (*globalWorkQueueList)[0] = new WorkQueue(); + AnyInvocableClosure closure([] {}); + int element_count = state.range(0); + for (auto _ : state) { + int cnt = 0; + auto* queue = (*globalWorkQueueList)[0]; + for (int i = 0; i < element_count; i++) queue->Add(&closure); + absl::optional popped; + cnt = 0; + do { + popped = queue->PopBack(); + if (popped.has_value()) ++cnt; + } while (cnt < element_count); + } + state.counters["Added"] = element_count * state.iterations(); + state.counters["Popped"] = state.counters["Added"]; + state.counters["Steal Rate"] = + benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); + if (state.thread_index() == 0) { + delete (*globalWorkQueueList)[0]; + } +} +BENCHMARK(BM_MultithreadedWorkQueuePopBack) + ->Setup(GlobalSetup) + ->Teardown(GlobalTeardown) + ->Range(1, 512) + ->UseRealTime() + ->MeasureProcessCPUTime() + ->Threads(1) + ->Threads(4) + ->ThreadPerCpu(); + +void BM_WorkQueueClosureExecution(benchmark::State& state) { + WorkQueue queue; + int element_count = state.range(0); + int run_count = 0; + grpc_event_engine::experimental::AnyInvocableClosure closure( + [&run_count] { ++run_count; }); + for (auto _ : state) { + for (int i = 0; i < element_count; i++) queue.Add(&closure); + do { + queue.PopFront()->Run(); + } while (run_count < element_count); + run_count = 0; + } + state.counters["Added"] = element_count * state.iterations(); + state.counters["Popped"] = state.counters["Added"]; + state.counters["Steal Rate"] = + benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); +} +BENCHMARK(BM_WorkQueueClosureExecution) + ->Range(8, 128) + ->UseRealTime() + ->MeasureProcessCPUTime(); + +void BM_WorkQueueAnyInvocableExecution(benchmark::State& state) { + WorkQueue queue; + int element_count = state.range(0); + int run_count = 0; + for (auto _ : state) { + for (int i = 0; i < element_count; i++) { + queue.Add([&run_count] { ++run_count; }); + } + do { + queue.PopFront()->Run(); + } while (run_count < element_count); + run_count = 0; + } + state.counters["Added"] = element_count * state.iterations(); + state.counters["Popped"] = state.counters["Added"]; + state.counters["Steal Rate"] = + benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); +} +BENCHMARK(BM_WorkQueueAnyInvocableExecution) + ->Range(8, 128) + ->UseRealTime() + ->MeasureProcessCPUTime(); + +void BM_StdDequeLIFO(benchmark::State& state) { + if (state.thread_index() == 0) { + (*globalDequeList)[0] = new std::deque(); + } + auto& mu = (*globalDequeMutexList)[0]; + int element_count = state.range(0); + AnyInvocableClosure closure([] {}); + for (auto _ : state) { + auto* queue = (*globalDequeList)[0]; + for (int i = 0; i < element_count; i++) { + grpc_core::MutexLock lock(&mu); + queue->emplace_back(&closure); + } + for (int i = 0; i < element_count; i++) { + grpc_core::MutexLock lock(&mu); + EventEngine::Closure* popped = queue->back(); + queue->pop_back(); + assert(popped != nullptr); + } + } + state.counters["Added"] = element_count * state.iterations(); + state.counters["Popped"] = state.counters["Added"]; + state.counters["Steal Rate"] = + benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); + if (state.thread_index() == 0) { + delete (*globalDequeList)[0]; + } +} +BENCHMARK(BM_StdDequeLIFO) + ->Setup(GlobalSetup) + ->Teardown(GlobalTeardown) + ->Range(1, 512) + ->UseRealTime() + ->MeasureProcessCPUTime() + ->Threads(1) + ->Threads(4) + ->ThreadPerCpu(); + +void PerThreadArguments(benchmark::internal::Benchmark* b) { + b->Setup(GlobalSetup) + ->Teardown(GlobalTeardown) + ->ArgsProduct({/*pop_attempts=*/{10, 50, 250}, + /*pct_fill=*/{2, 10, 50}}) + ->UseRealTime() + ->MeasureProcessCPUTime() + ->Threads(10) + ->ThreadPerCpu(); +} + +void BM_WorkQueuePerThread(benchmark::State& state) { + WorkQueue local_queue; + { + grpc_core::MutexLock lock(&globalMu); + (*globalWorkQueueList)[state.thread_index()] = &local_queue; + } + AnyInvocableClosure closure([] {}); + int element_count = state.range(0); + float pct_fill = state.range(1) / 100.0; + for (auto _ : state) { + // sparsely populate a queue + for (int i = 0; i < std::ceil(element_count * pct_fill); i++) { + local_queue.Add(&closure); + } + // attempt to pop from all thread queues `element_count` times + int pop_attempts = 0; + auto iq = globalWorkQueueList->begin(); + while (pop_attempts++ < element_count) { + // may not get a value if the queue being looked at from another thread + (*iq)->PopBack(); + if (iq == globalWorkQueueList->end()) { + iq = globalWorkQueueList->begin(); + } else { + iq++; + }; + } + } + state.counters["Added"] = + std::ceil(element_count * pct_fill) * state.iterations(); + state.counters["Steal Attempts"] = element_count * state.iterations(); + state.counters["Steal Rate"] = benchmark::Counter( + state.counters["Steal Attempts"], benchmark::Counter::kIsRate); + if (state.thread_index() == 0) { + for (auto* queue : *globalWorkQueueList) { + assert(queue->Empty()); + } + } +} +BENCHMARK(BM_WorkQueuePerThread)->Apply(PerThreadArguments); + +void BM_StdDequePerThread(benchmark::State& state) { + std::deque local_queue; + (*globalDequeList)[state.thread_index()] = &local_queue; + int element_count = state.range(0); + float pct_fill = state.range(1) / 100.0; + AnyInvocableClosure closure([] {}); + auto& local_mu = (*globalDequeMutexList)[state.thread_index()]; + for (auto _ : state) { + // sparsely populate a queue + for (int i = 0; i < std::ceil(element_count * pct_fill); i++) { + grpc_core::MutexLock lock(&local_mu); + local_queue.emplace_back(&closure); + } + int pop_attempts = 0; + auto iq = globalDequeList->begin(); + auto mu = globalDequeMutexList->begin(); + while (pop_attempts++ < element_count) { + { + grpc_core::MutexLock lock(&*mu); + if (!(*iq)->empty()) { + assert((*iq)->back() != nullptr); + (*iq)->pop_back(); + } + } + if (iq == globalDequeList->end()) { + iq = globalDequeList->begin(); + mu = globalDequeMutexList->begin(); + } else { + ++iq; + ++mu; + }; + } + } + state.counters["Added"] = + std::ceil(element_count * pct_fill) * state.iterations(); + state.counters["Steal Attempts"] = element_count * state.iterations(); + state.counters["Steal Rate"] = benchmark::Counter( + state.counters["Steal Attempts"], benchmark::Counter::kIsRate); + if (state.thread_index() == 0) { + for (auto* queue : *globalDequeList) { + assert(queue->empty()); + } + } +} +BENCHMARK(BM_StdDequePerThread)->Apply(PerThreadArguments); + +} // namespace + +// Some distros have RunSpecifiedBenchmarks under the benchmark namespace, +// and others do not. This allows us to support both modes. +namespace benchmark { +void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } +} // namespace benchmark + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(&argc, argv); + ::benchmark::Initialize(&argc, argv); + benchmark::RunTheBenchmarksNamespaced(); + return 0; +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 0c76c081548..b7e38ea539e 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -7893,6 +7893,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "work_queue_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,