Performant thread-safe Work Queue (#30821)

* WorkQueue

* weaken the large obj stress test for Windows; documentation

* update comment

* Add WorkQueue microbenchmark. Results below ...

------------------------------------------------------------------------------------------
Benchmark                                Time             CPU   Iterations UserCounters...
------------------------------------------------------------------------------------------
BM_WorkQueueIntptrPopFront/1           297 ns          297 ns      2343500 items_per_second=3.3679M/s
BM_WorkQueueIntptrPopFront/8          7022 ns         7020 ns        99356 items_per_second=1.13956M/s
BM_WorkQueueIntptrPopFront/64        59606 ns        59590 ns        11770 items_per_second=1074k/s
BM_WorkQueueIntptrPopFront/512      477867 ns       477748 ns         1469 items_per_second=1071.7k/s
BM_WorkQueueIntptrPopFront/4096    3815786 ns      3814925 ns          184 items_per_second=1073.68k/s
I0902 19:05:22.138022069      12 test_config.cc:194]         TestEnvironment ends
================================================================================

* use int64_t for times. 0 performance change

------------------------------------------------------------------------------------------
Benchmark                                Time             CPU   Iterations UserCounters...
------------------------------------------------------------------------------------------
BM_WorkQueueIntptrPopFront/1           277 ns          277 ns      2450292 items_per_second=3.60967M/s
BM_WorkQueueIntptrPopFront/8          6718 ns         6716 ns       105497 items_per_second=1.19126M/s
BM_WorkQueueIntptrPopFront/64        56428 ns        56401 ns        12268 items_per_second=1.13474M/s
BM_WorkQueueIntptrPopFront/512      458953 ns       458817 ns         1550 items_per_second=1.11591M/s
BM_WorkQueueIntptrPopFront/4096    3686357 ns      3685120 ns          191 items_per_second=1.1115M/s
I0902 19:25:31.549382949      12 test_config.cc:194]         TestEnvironment ends
================================================================================

* add PopBack tests: same performance profile exactly

* use Mutex instead of Spinlock

It's safer, and so far equally performant in benchmarks of opt builds

* add deque test for comparison. It is faster on all tests.

* Add sparsely-populated multi-threaded benchmarks.

* fix

* fix

* refactor to help thread safety analysis

* Specialize WorkQueue for Closure*s and AnyInvocables

* remove unused callback storage

* add single-threaded benchmark for closure vs invocable

* sanitize

* missing include

* move bm_work_queue to microbenchmarks so it isn't exported

* s/workqueue/work_queue/g

* use nullptr instead of optionals for popped closures

* reviewer test suggestion

* private things are private

* add a work_queue fuzzer

Ran for 10 minutes @ 42 jobs @ 42 workers. Zero failures.

Checked in a selection of 100 good seeds after merging the thousands of
results.

* fix

* fix header guards

* nuke the corpora

* feedback

* sanitize

* Timestamp::Now

* fix

* fuzzers do not work on windows

* windows does not like multithreaded benchmark tests
pull/31109/head
AJ Heller 3 years ago committed by GitHub
parent f15ba1ffc7
commit fb14fdf0e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      BUILD
  2. 37
      CMakeLists.txt
  3. 12
      build_autogenerated.yaml
  4. 194
      src/core/lib/event_engine/work_queue.cc
  5. 121
      src/core/lib/event_engine/work_queue.h
  6. 51
      test/core/event_engine/work_queue/BUILD
  7. 0
      test/core/event_engine/work_queue/corpora/empty
  8. 137
      test/core/event_engine/work_queue/work_queue_fuzzer.cc
  9. 48
      test/core/event_engine/work_queue/work_queue_fuzzer.proto
  10. 170
      test/core/event_engine/work_queue/work_queue_test.cc
  11. 20
      test/cpp/microbenchmarks/BUILD
  12. 313
      test/cpp/microbenchmarks/bm_work_queue.cc
  13. 24
      tools/run_tests/generated/tests.json

21
BUILD

@ -2427,6 +2427,27 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "event_engine_work_queue",
srcs = [
"src/core/lib/event_engine/work_queue.cc",
],
hdrs = [
"src/core/lib/event_engine/work_queue.h",
],
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
"absl/types:optional",
],
deps = [
"common_event_engine_closures",
"event_engine_base_hdrs",
"gpr",
"time",
],
)
grpc_cc_library(
name = "event_engine_threaded_executor",
srcs = [

37
CMakeLists.txt generated

@ -1242,6 +1242,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx window_overflow_bad_client_test)
add_dependencies(buildtests_cxx wire_reader_test)
add_dependencies(buildtests_cxx wire_writer_test)
add_dependencies(buildtests_cxx work_queue_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx work_serializer_test)
endif()
@ -19740,6 +19741,42 @@ target_link_libraries(wire_writer_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(work_queue_test
src/core/lib/event_engine/work_queue.cc
test/core/event_engine/work_queue/work_queue_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(work_queue_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(work_queue_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util_unsecure
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

@ -10758,6 +10758,18 @@ targets:
- absl/cleanup:cleanup
- grpc_test_util
uses_polling: false
- name: work_queue_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/work_queue.h
src:
- src/core/lib/event_engine/work_queue.cc
- test/core/event_engine/work_queue/work_queue_test.cc
deps:
- grpc_test_util_unsecure
- name: work_serializer_test
gtest: true
build: test

@ -0,0 +1,194 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/work_queue.h"
#include <cstdint>
#include <memory>
#include <utility>
#include "absl/functional/any_invocable.h"
#include "src/core/lib/event_engine/common_closures.h"
namespace grpc_event_engine {
namespace experimental {
// ------ WorkQueue::Storage --------------------------------------------------
WorkQueue::Storage::Storage(EventEngine::Closure* closure) noexcept
: closure_(closure),
enqueued_(
grpc_core::Timestamp::Now().milliseconds_after_process_epoch()) {}
WorkQueue::Storage::Storage(absl::AnyInvocable<void()> callback) noexcept
: closure_(SelfDeletingClosure::Create(std::move(callback))),
enqueued_(
grpc_core::Timestamp::Now().milliseconds_after_process_epoch()) {}
WorkQueue::Storage::Storage(Storage&& other) noexcept
: closure_(other.closure_), enqueued_(other.enqueued_) {}
WorkQueue::Storage& WorkQueue::Storage::operator=(Storage&& other) noexcept {
std::swap(closure_, other.closure_);
std::swap(enqueued_, other.enqueued_);
return *this;
}
EventEngine::Closure* WorkQueue::Storage::closure() { return closure_; }
// ------ WorkQueue -----------------------------------------------------------
// Returns whether the queue is empty
bool WorkQueue::Empty() const {
return (most_recent_element_enqueue_timestamp_.load(
std::memory_order_relaxed) == kInvalidTimestamp &&
oldest_enqueued_timestamp_.load(std::memory_order_relaxed) ==
kInvalidTimestamp);
}
grpc_core::Timestamp WorkQueue::OldestEnqueuedTimestamp() const {
int64_t front_of_queue_timestamp =
oldest_enqueued_timestamp_.load(std::memory_order_relaxed);
if (front_of_queue_timestamp != kInvalidTimestamp) {
return grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
front_of_queue_timestamp);
}
int64_t most_recent_millis =
most_recent_element_enqueue_timestamp_.load(std::memory_order_relaxed);
if (most_recent_millis == kInvalidTimestamp) {
return grpc_core::Timestamp::InfPast();
}
return grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
most_recent_millis);
}
EventEngine::Closure* WorkQueue::PopFront() ABSL_LOCKS_EXCLUDED(mu_) {
if (oldest_enqueued_timestamp_.load(std::memory_order_relaxed) !=
kInvalidTimestamp) {
EventEngine::Closure* t = TryLockAndPop(/*front=*/true);
if (t != nullptr) return t;
}
if (most_recent_element_enqueue_timestamp_.load(std::memory_order_relaxed) !=
kInvalidTimestamp) {
return TryPopMostRecentElement();
}
return nullptr;
}
EventEngine::Closure* WorkQueue::PopBack() {
if (most_recent_element_enqueue_timestamp_.load(std::memory_order_relaxed) !=
kInvalidTimestamp) {
return TryPopMostRecentElement();
}
if (oldest_enqueued_timestamp_.load(std::memory_order_relaxed) !=
kInvalidTimestamp) {
EventEngine::Closure* t = TryLockAndPop(/*front=*/false);
if (t != nullptr) return t;
}
return nullptr;
}
void WorkQueue::Add(EventEngine::Closure* closure) {
AddInternal(Storage(closure));
}
void WorkQueue::Add(absl::AnyInvocable<void()> invocable) {
AddInternal(Storage(std::move(invocable)));
}
void WorkQueue::AddInternal(Storage&& storage) {
Storage previous_most_recent;
int64_t previous_ts;
{
absl::optional<Storage> tmp_element;
{
grpc_core::MutexLock lock(&most_recent_element_lock_);
previous_ts = most_recent_element_enqueue_timestamp_.exchange(
storage.enqueued(), std::memory_order_relaxed);
tmp_element = std::exchange(most_recent_element_, std::move(storage));
}
if (!tmp_element.has_value() || previous_ts == kInvalidTimestamp) return;
previous_most_recent = std::move(*tmp_element);
}
grpc_core::MutexLock lock(&mu_);
if (elements_.empty()) {
oldest_enqueued_timestamp_.store(previous_ts, std::memory_order_relaxed);
}
elements_.push_back(std::move(previous_most_recent));
}
EventEngine::Closure* WorkQueue::TryLockAndPop(bool front)
ABSL_LOCKS_EXCLUDED(mu_) {
// Do not block the worker if there are other workers trying to pop
// tasks from this queue.
if (!mu_.TryLock()) return nullptr;
auto ret = PopLocked(front);
mu_.Unlock();
return ret;
}
EventEngine::Closure* WorkQueue::PopLocked(bool front)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (GPR_UNLIKELY(elements_.empty())) {
if (most_recent_element_enqueue_timestamp_.load(
std::memory_order_relaxed) == kInvalidTimestamp) {
return nullptr;
}
if (!most_recent_element_lock_.TryLock()) return nullptr;
absl::optional<Storage> ret;
if (GPR_LIKELY(most_recent_element_.has_value())) {
most_recent_element_enqueue_timestamp_.store(kInvalidTimestamp,
std::memory_order_relaxed);
ret = std::exchange(most_recent_element_, absl::nullopt);
}
most_recent_element_lock_.Unlock();
return ret->closure();
}
// the queue has elements, let's pop one and update timestamps
Storage ret_s;
if (front) {
ret_s = std::move(elements_.front());
elements_.pop_front();
} else {
ret_s = std::move(elements_.back());
elements_.pop_back();
}
if (elements_.empty()) {
oldest_enqueued_timestamp_.store(kInvalidTimestamp,
std::memory_order_relaxed);
} else if (front) {
oldest_enqueued_timestamp_.store(elements_.front().enqueued(),
std::memory_order_relaxed);
}
return ret_s.closure();
}
EventEngine::Closure* WorkQueue::TryPopMostRecentElement() {
if (!most_recent_element_lock_.TryLock()) return nullptr;
if (GPR_UNLIKELY(!most_recent_element_.has_value())) {
most_recent_element_lock_.Unlock();
return nullptr;
}
most_recent_element_enqueue_timestamp_.store(kInvalidTimestamp,
std::memory_order_relaxed);
absl::optional<Storage> tmp =
std::exchange(most_recent_element_, absl::nullopt);
most_recent_element_lock_.Unlock();
return tmp->closure();
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,121 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_H
#define GRPC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <atomic>
#include <deque>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
namespace grpc_event_engine {
namespace experimental {
// A fast work queue based lightly on an internal Google implementation.
//
// This uses atomics to access the most recent element in the queue, making it
// fast for LIFO operations. Accessing the oldest (next) element requires taking
// a mutex lock.
class WorkQueue {
public:
// comparable to Timestamp::milliseconds_after_process_epoch()
static const int64_t kInvalidTimestamp = -1;
WorkQueue() = default;
// Returns whether the queue is empty
bool Empty() const;
// Returns the Timestamp of when the most recently-added element was
// enqueued.
grpc_core::Timestamp OldestEnqueuedTimestamp() const;
// Returns the next (oldest) element from the queue, or nullopt if empty
EventEngine::Closure* PopFront() ABSL_LOCKS_EXCLUDED(mu_);
// Returns the most recent element from the queue, or nullopt if empty
EventEngine::Closure* PopBack();
// Adds a closure to the back of the queue
void Add(EventEngine::Closure* closure);
// Wraps an AnyInvocable and adds it to the back of the queue
void Add(absl::AnyInvocable<void()> invocable);
private:
class Storage {
public:
Storage() = default;
// Take a non-owned Closure*
// Requires an exec_ctx on the stack
// TODO(ctiller): replace with an alternative time source
explicit Storage(EventEngine::Closure* closure) noexcept;
// Wrap an AnyInvocable into a Closure.
// The closure must be executed or explicitly deleted to prevent memory
// leaks. Requires an exec_ctx on the stack
// TODO(ctiller): replace with an alternative time source
explicit Storage(absl::AnyInvocable<void()> callback) noexcept;
~Storage() = default;
// not copyable
Storage(const Storage&) = delete;
Storage& operator=(const Storage&) = delete;
// moveable
Storage(Storage&& other) noexcept;
Storage& operator=(Storage&& other) noexcept;
// Is this enqueued?
int64_t enqueued() const { return enqueued_; }
// Get the stored closure, or wrapped AnyInvocable
EventEngine::Closure* closure();
private:
EventEngine::Closure* closure_ = nullptr;
int64_t enqueued_ = kInvalidTimestamp;
};
// Attempts to pop from the front of the queue (oldest).
// This will return nullopt if the queue is empty, or if other workers
// are already attempting to pop from this queue.
EventEngine::Closure* TryLockAndPop(bool front) ABSL_LOCKS_EXCLUDED(mu_);
// Internal implementation, helps with thread safety analysis in TryLockAndPop
EventEngine::Closure* PopLocked(bool front)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Attempts to pop from the back of the queue (most recent).
// This will return nullopt if the queue is empty, or if other workers
// are already attempting to pop from this queue.
EventEngine::Closure* TryPopMostRecentElement();
// Common code for the Add methods
void AddInternal(Storage&& storage);
// The managed items in the queue
std::deque<Storage> elements_ ABSL_GUARDED_BY(mu_);
// The most recently enqueued element. This is reserved from work stealing
absl::optional<Storage> most_recent_element_
ABSL_GUARDED_BY(most_recent_element_lock_);
grpc_core::Mutex ABSL_ACQUIRED_AFTER(mu_) most_recent_element_lock_;
// TODO(hork): consider ABSL_CACHELINE_ALIGNED
std::atomic<int64_t> most_recent_element_enqueue_timestamp_{
kInvalidTimestamp};
std::atomic<int64_t> oldest_enqueued_timestamp_{kInvalidTimestamp};
grpc_core::Mutex mu_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_H

@ -0,0 +1,51 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
load("//test/core/util:grpc_fuzzer.bzl", "grpc_proto_fuzzer")
licenses(["notice"])
grpc_package(
name = "test/core/event_engine/work_queue",
visibility = "tests",
)
grpc_cc_test(
name = "work_queue_test",
srcs = ["work_queue_test.cc"],
external_deps = ["gtest"],
deps = [
"//:common_event_engine_closures",
"//:event_engine_work_queue",
"//:exec_ctx",
"//:gpr_platform",
"//test/core/util:grpc_test_util_unsecure",
],
)
grpc_proto_fuzzer(
name = "work_queue_fuzzer",
srcs = ["work_queue_fuzzer.cc"],
corpus = "corpora",
language = "C++",
proto = "work_queue_fuzzer.proto",
tags = ["no_windows"],
uses_event_engine = False,
uses_polling = False,
deps = [
"//:event_engine_work_queue",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,137 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <deque>
#include "absl/container/flat_hash_map.h"
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/work_queue.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/event_engine/work_queue/work_queue_fuzzer.pb.h"
bool squelch = true;
bool leak_check = true;
namespace grpc_event_engine {
namespace experimental {
class WorkQueueFuzzer {
public:
WorkQueueFuzzer() { CheckEqual(); };
~WorkQueueFuzzer() { CheckEqual(); };
void Run(const work_queue_fuzzer::Action& action) {
switch (action.action_type_case()) {
case work_queue_fuzzer::Action::kAdd: {
if (action.add().type() == work_queue_fuzzer::CALLBACK_TYPE_CLOSURE) {
work_queue_.Add(CreateClosure(action.add().key()));
deque_.push_back(CreateClosure(action.add().key()));
} else {
work_queue_.Add(CreateInvocable(action.add().key()));
deque_.push_back(CreateClosureWrappedInvocable(action.add().key()));
}
} break;
case work_queue_fuzzer::Action::kPopFront: {
// pop front closures, executing both to check they are a pair
auto* wq_c = work_queue_.PopFront();
if (wq_c == nullptr) {
if (!work_queue_.Empty() || !deque_.empty()) abort();
} else {
auto* dq_c = deque_.front();
deque_.pop_front();
wq_c->Run();
dq_c->Run();
}
} break;
case work_queue_fuzzer::Action::kPopBack: {
// pop back closures, executing both to check they are a pair
auto* wq_c = work_queue_.PopBack();
if (wq_c == nullptr) {
if (!work_queue_.Empty() || !deque_.empty()) abort();
} else {
auto* dq_c = deque_.back();
deque_.pop_back();
wq_c->Run();
dq_c->Run();
}
} break;
case work_queue_fuzzer::Action::kEmpty: {
if (work_queue_.Empty() != deque_.empty()) abort();
} break;
case work_queue_fuzzer::Action::ACTION_TYPE_NOT_SET:
break;
};
}
private:
EventEngine::Closure* CreateClosure(int key) {
return SelfDeletingClosure::Create([key, this] {
if (last_executed_key_.has_value()) {
if (*last_executed_key_ != key) abort();
last_executed_key_.reset();
} else {
last_executed_key_ = key;
}
});
}
absl::AnyInvocable<void()> CreateInvocable(int key) {
return absl::AnyInvocable<void()>([key, this] {
if (last_executed_key_.has_value()) {
if (*last_executed_key_ != key) abort();
last_executed_key_.reset();
} else {
last_executed_key_ = key;
}
});
}
EventEngine::Closure* CreateClosureWrappedInvocable(int key) {
auto invocable = CreateInvocable(key);
return SelfDeletingClosure::Create(
[invocable = std::move(invocable)]() mutable { invocable(); });
}
void CheckEqual() {
while (auto* wq_c = work_queue_.PopBack()) {
if (deque_.empty()) abort();
auto* dq_c = deque_.back();
deque_.pop_back();
wq_c->Run();
dq_c->Run();
}
}
WorkQueue work_queue_;
std::deque<EventEngine::Closure*> deque_;
// Closures are always added in pairs and checked in paris.
// When checking, each popped closure encounters one of these situations:
// A) it is the first of a pair, denoted by an empty last_executed_key_, so
// it sets last_executed_key_ to its own key.
// B) last_executed_key_ is set, so its value must match this closure's own
// key to assert that it is the other part of the pair. last_executed_key_
// is then reset.
absl::optional<int> last_executed_key_;
};
} // namespace experimental
} // namespace grpc_event_engine
DEFINE_PROTO_FUZZER(const work_queue_fuzzer::Msg& msg) {
for (const auto& action : msg.actions()) {
grpc_event_engine::experimental::WorkQueueFuzzer().Run(action);
}
}

@ -0,0 +1,48 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package work_queue_fuzzer;
enum CallbackType {
CALLBACK_TYPE_CLOSURE = 0;
CALLBACK_TYPE_ANY_INVOCABLE = 1;
}
message Add {
CallbackType type = 1;
int32 key = 2;
}
message PopFront {
}
message PopBack {
}
message Empty {
}
message Action {
oneof action_type {
Add add = 1;
PopFront pop_front = 2;
PopBack pop_back = 3;
Empty empty = 4;
}
}
message Msg {
repeated Action actions = 1;
}

@ -0,0 +1,170 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/work_queue.h"
#include <thread>
#include <gtest/gtest.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "test/core/util/test_config.h"
namespace {
using ::grpc_event_engine::experimental::AnyInvocableClosure;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::WorkQueue;
TEST(WorkQueueTest, StartsEmpty) {
WorkQueue queue;
ASSERT_TRUE(queue.Empty());
}
TEST(WorkQueueTest, TakesClosures) {
WorkQueue queue;
bool ran = false;
AnyInvocableClosure closure([&ran] { ran = true; });
queue.Add(&closure);
ASSERT_FALSE(queue.Empty());
EventEngine::Closure* popped = queue.PopFront();
ASSERT_NE(popped, nullptr);
popped->Run();
ASSERT_TRUE(ran);
ASSERT_TRUE(queue.Empty());
}
TEST(WorkQueueTest, TakesAnyInvocables) {
WorkQueue queue;
bool ran = false;
queue.Add([&ran] { ran = true; });
ASSERT_FALSE(queue.Empty());
EventEngine::Closure* popped = queue.PopFront();
ASSERT_NE(popped, nullptr);
popped->Run();
ASSERT_TRUE(ran);
ASSERT_TRUE(queue.Empty());
}
TEST(WorkQueueTest, BecomesEmptyOnPopBack) {
WorkQueue queue;
bool ran = false;
queue.Add([&ran] { ran = true; });
ASSERT_FALSE(queue.Empty());
EventEngine::Closure* closure = queue.PopBack();
ASSERT_NE(closure, nullptr);
closure->Run();
ASSERT_TRUE(ran);
ASSERT_TRUE(queue.Empty());
}
TEST(WorkQueueTest, PopFrontIsFIFO) {
WorkQueue queue;
int flag = 0;
queue.Add([&flag] { flag |= 1; });
queue.Add([&flag] { flag |= 2; });
queue.PopFront()->Run();
EXPECT_TRUE(flag & 1);
EXPECT_FALSE(flag & 2);
queue.PopFront()->Run();
EXPECT_TRUE(flag & 1);
EXPECT_TRUE(flag & 2);
ASSERT_TRUE(queue.Empty());
}
TEST(WorkQueueTest, PopBackIsLIFO) {
WorkQueue queue;
int flag = 0;
queue.Add([&flag] { flag |= 1; });
queue.Add([&flag] { flag |= 2; });
queue.PopBack()->Run();
EXPECT_FALSE(flag & 1);
EXPECT_TRUE(flag & 2);
queue.PopBack()->Run();
EXPECT_TRUE(flag & 1);
EXPECT_TRUE(flag & 2);
ASSERT_TRUE(queue.Empty());
}
TEST(WorkQueueTest, OldestEnqueuedTimestampIsSane) {
WorkQueue queue;
ASSERT_EQ(queue.OldestEnqueuedTimestamp(), grpc_core::Timestamp::InfPast());
queue.Add([] {});
ASSERT_LE(queue.OldestEnqueuedTimestamp(), grpc_core::Timestamp::Now());
auto* popped = queue.PopFront();
ASSERT_EQ(queue.OldestEnqueuedTimestamp(), grpc_core::Timestamp::InfPast());
// prevent leaks by executing or deleting the closure
delete popped;
}
TEST(WorkQueueTest, OldestEnqueuedTimestampOrderingIsCorrect) {
WorkQueue queue;
AnyInvocableClosure closure([] {});
queue.Add(&closure);
absl::SleepFor(absl::Milliseconds(2));
queue.Add(&closure);
absl::SleepFor(absl::Milliseconds(2));
queue.Add(&closure);
absl::SleepFor(absl::Milliseconds(2));
auto oldest_ts = queue.OldestEnqueuedTimestamp();
ASSERT_LE(oldest_ts, grpc_core::Timestamp::Now());
// pop the oldest, and ensure the next oldest is younger
EventEngine::Closure* popped = queue.PopFront();
ASSERT_NE(popped, nullptr);
auto second_oldest_ts = queue.OldestEnqueuedTimestamp();
ASSERT_GT(second_oldest_ts, oldest_ts);
// pop the oldest, and ensure the last one is youngest
popped = queue.PopFront();
ASSERT_NE(popped, nullptr);
auto youngest_ts = queue.OldestEnqueuedTimestamp();
ASSERT_GT(youngest_ts, second_oldest_ts);
ASSERT_GT(youngest_ts, oldest_ts);
}
TEST(WorkQueueTest, ThreadedStress) {
WorkQueue queue;
constexpr int thd_count = 33;
constexpr int element_count_per_thd = 3333;
std::vector<std::thread> threads;
threads.reserve(thd_count);
class TestClosure : public EventEngine::Closure {
public:
void Run() override { delete this; }
};
for (int i = 0; i < thd_count; i++) {
threads.emplace_back([&] {
for (int j = 0; j < element_count_per_thd; j++) {
queue.Add(new TestClosure());
}
int run_count = 0;
while (run_count < element_count_per_thd) {
if (auto* c = queue.PopFront()) {
c->Run();
++run_count;
}
}
});
}
for (auto& thd : threads) thd.join();
EXPECT_TRUE(queue.Empty());
}
} // namespace
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
auto result = RUN_ALL_TESTS();
return result;
}

@ -389,3 +389,23 @@ grpc_cc_test(
],
deps = [":callback_streaming_ping_pong_h"],
)
grpc_cc_test(
name = "bm_work_queue",
srcs = ["bm_work_queue.cc"],
args = grpc_benchmark_args(),
external_deps = ["benchmark"],
tags = [
"manual",
"no_windows",
"notap",
],
uses_event_engine = False,
uses_polling = False,
deps = [
"//:common_event_engine_closures",
"//:event_engine_work_queue",
"//:gpr",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,313 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <cmath>
#include <deque>
#include <sstream>
// ensure assert() is enabled
#undef NDEBUG
#include <cassert>
#include <benchmark/benchmark.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/work_queue.h"
#include "test/core/util/test_config.h"
namespace {
using ::grpc_event_engine::experimental::AnyInvocableClosure;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::WorkQueue;
grpc_core::Mutex globalMu;
std::vector<WorkQueue*>* globalWorkQueueList;
std::vector<std::deque<EventEngine::Closure*>*>* globalDequeList;
std::vector<grpc_core::Mutex>* globalDequeMutexList;
void GlobalSetup(const benchmark::State& state) {
// called for every test, resets all state
globalWorkQueueList = new std::vector<WorkQueue*>();
globalWorkQueueList->reserve(state.threads());
globalDequeList = new std::vector<std::deque<EventEngine::Closure*>*>();
globalDequeList->reserve(state.threads());
globalDequeMutexList = new std::vector<grpc_core::Mutex>(
std::vector<grpc_core::Mutex>(state.threads()));
}
void GlobalTeardown(const benchmark::State& /* state */) {
// called for every test, resets all state
delete globalWorkQueueList;
delete globalDequeList;
delete globalDequeMutexList;
}
void BM_WorkQueueIntptrPopFront(benchmark::State& state) {
WorkQueue queue;
grpc_event_engine::experimental::AnyInvocableClosure closure([] {});
int element_count = state.range(0);
for (auto _ : state) {
int cnt = 0;
for (int i = 0; i < element_count; i++) queue.Add(&closure);
absl::optional<EventEngine::Closure*> popped;
cnt = 0;
do {
popped = queue.PopFront();
if (popped.has_value()) ++cnt;
} while (cnt < element_count);
}
state.counters["Added"] = element_count * state.iterations();
state.counters["Popped"] = state.counters["Added"];
state.counters["Steal Rate"] =
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
}
BENCHMARK(BM_WorkQueueIntptrPopFront)
->Setup(GlobalSetup)
->Teardown(GlobalTeardown)
->Range(1, 512)
->UseRealTime()
->MeasureProcessCPUTime();
void BM_MultithreadedWorkQueuePopBack(benchmark::State& state) {
if (state.thread_index() == 0) (*globalWorkQueueList)[0] = new WorkQueue();
AnyInvocableClosure closure([] {});
int element_count = state.range(0);
for (auto _ : state) {
int cnt = 0;
auto* queue = (*globalWorkQueueList)[0];
for (int i = 0; i < element_count; i++) queue->Add(&closure);
absl::optional<EventEngine::Closure*> popped;
cnt = 0;
do {
popped = queue->PopBack();
if (popped.has_value()) ++cnt;
} while (cnt < element_count);
}
state.counters["Added"] = element_count * state.iterations();
state.counters["Popped"] = state.counters["Added"];
state.counters["Steal Rate"] =
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
if (state.thread_index() == 0) {
delete (*globalWorkQueueList)[0];
}
}
BENCHMARK(BM_MultithreadedWorkQueuePopBack)
->Setup(GlobalSetup)
->Teardown(GlobalTeardown)
->Range(1, 512)
->UseRealTime()
->MeasureProcessCPUTime()
->Threads(1)
->Threads(4)
->ThreadPerCpu();
void BM_WorkQueueClosureExecution(benchmark::State& state) {
WorkQueue queue;
int element_count = state.range(0);
int run_count = 0;
grpc_event_engine::experimental::AnyInvocableClosure closure(
[&run_count] { ++run_count; });
for (auto _ : state) {
for (int i = 0; i < element_count; i++) queue.Add(&closure);
do {
queue.PopFront()->Run();
} while (run_count < element_count);
run_count = 0;
}
state.counters["Added"] = element_count * state.iterations();
state.counters["Popped"] = state.counters["Added"];
state.counters["Steal Rate"] =
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
}
BENCHMARK(BM_WorkQueueClosureExecution)
->Range(8, 128)
->UseRealTime()
->MeasureProcessCPUTime();
void BM_WorkQueueAnyInvocableExecution(benchmark::State& state) {
WorkQueue queue;
int element_count = state.range(0);
int run_count = 0;
for (auto _ : state) {
for (int i = 0; i < element_count; i++) {
queue.Add([&run_count] { ++run_count; });
}
do {
queue.PopFront()->Run();
} while (run_count < element_count);
run_count = 0;
}
state.counters["Added"] = element_count * state.iterations();
state.counters["Popped"] = state.counters["Added"];
state.counters["Steal Rate"] =
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
}
BENCHMARK(BM_WorkQueueAnyInvocableExecution)
->Range(8, 128)
->UseRealTime()
->MeasureProcessCPUTime();
void BM_StdDequeLIFO(benchmark::State& state) {
if (state.thread_index() == 0) {
(*globalDequeList)[0] = new std::deque<EventEngine::Closure*>();
}
auto& mu = (*globalDequeMutexList)[0];
int element_count = state.range(0);
AnyInvocableClosure closure([] {});
for (auto _ : state) {
auto* queue = (*globalDequeList)[0];
for (int i = 0; i < element_count; i++) {
grpc_core::MutexLock lock(&mu);
queue->emplace_back(&closure);
}
for (int i = 0; i < element_count; i++) {
grpc_core::MutexLock lock(&mu);
EventEngine::Closure* popped = queue->back();
queue->pop_back();
assert(popped != nullptr);
}
}
state.counters["Added"] = element_count * state.iterations();
state.counters["Popped"] = state.counters["Added"];
state.counters["Steal Rate"] =
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
if (state.thread_index() == 0) {
delete (*globalDequeList)[0];
}
}
BENCHMARK(BM_StdDequeLIFO)
->Setup(GlobalSetup)
->Teardown(GlobalTeardown)
->Range(1, 512)
->UseRealTime()
->MeasureProcessCPUTime()
->Threads(1)
->Threads(4)
->ThreadPerCpu();
void PerThreadArguments(benchmark::internal::Benchmark* b) {
b->Setup(GlobalSetup)
->Teardown(GlobalTeardown)
->ArgsProduct({/*pop_attempts=*/{10, 50, 250},
/*pct_fill=*/{2, 10, 50}})
->UseRealTime()
->MeasureProcessCPUTime()
->Threads(10)
->ThreadPerCpu();
}
void BM_WorkQueuePerThread(benchmark::State& state) {
WorkQueue local_queue;
{
grpc_core::MutexLock lock(&globalMu);
(*globalWorkQueueList)[state.thread_index()] = &local_queue;
}
AnyInvocableClosure closure([] {});
int element_count = state.range(0);
float pct_fill = state.range(1) / 100.0;
for (auto _ : state) {
// sparsely populate a queue
for (int i = 0; i < std::ceil(element_count * pct_fill); i++) {
local_queue.Add(&closure);
}
// attempt to pop from all thread queues `element_count` times
int pop_attempts = 0;
auto iq = globalWorkQueueList->begin();
while (pop_attempts++ < element_count) {
// may not get a value if the queue being looked at from another thread
(*iq)->PopBack();
if (iq == globalWorkQueueList->end()) {
iq = globalWorkQueueList->begin();
} else {
iq++;
};
}
}
state.counters["Added"] =
std::ceil(element_count * pct_fill) * state.iterations();
state.counters["Steal Attempts"] = element_count * state.iterations();
state.counters["Steal Rate"] = benchmark::Counter(
state.counters["Steal Attempts"], benchmark::Counter::kIsRate);
if (state.thread_index() == 0) {
for (auto* queue : *globalWorkQueueList) {
assert(queue->Empty());
}
}
}
BENCHMARK(BM_WorkQueuePerThread)->Apply(PerThreadArguments);
void BM_StdDequePerThread(benchmark::State& state) {
std::deque<EventEngine::Closure*> local_queue;
(*globalDequeList)[state.thread_index()] = &local_queue;
int element_count = state.range(0);
float pct_fill = state.range(1) / 100.0;
AnyInvocableClosure closure([] {});
auto& local_mu = (*globalDequeMutexList)[state.thread_index()];
for (auto _ : state) {
// sparsely populate a queue
for (int i = 0; i < std::ceil(element_count * pct_fill); i++) {
grpc_core::MutexLock lock(&local_mu);
local_queue.emplace_back(&closure);
}
int pop_attempts = 0;
auto iq = globalDequeList->begin();
auto mu = globalDequeMutexList->begin();
while (pop_attempts++ < element_count) {
{
grpc_core::MutexLock lock(&*mu);
if (!(*iq)->empty()) {
assert((*iq)->back() != nullptr);
(*iq)->pop_back();
}
}
if (iq == globalDequeList->end()) {
iq = globalDequeList->begin();
mu = globalDequeMutexList->begin();
} else {
++iq;
++mu;
};
}
}
state.counters["Added"] =
std::ceil(element_count * pct_fill) * state.iterations();
state.counters["Steal Attempts"] = element_count * state.iterations();
state.counters["Steal Rate"] = benchmark::Counter(
state.counters["Steal Attempts"], benchmark::Counter::kIsRate);
if (state.thread_index() == 0) {
for (auto* queue : *globalDequeList) {
assert(queue->empty());
}
}
}
BENCHMARK(BM_StdDequePerThread)->Apply(PerThreadArguments);
} // namespace
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
// and others do not. This allows us to support both modes.
namespace benchmark {
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
} // namespace benchmark
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::benchmark::Initialize(&argc, argv);
benchmark::RunTheBenchmarksNamespaced();
return 0;
}

@ -7893,6 +7893,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "work_queue_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save