Abseil Common Libraries (C++) (grcp 依赖)
https://abseil.io/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
155 lines
5.3 KiB
155 lines
5.3 KiB
// Copyright 2020 The Abseil Authors. |
|
// |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// https://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
#include "absl/flags/internal/sequence_lock.h" |
|
|
|
#include <algorithm> |
|
#include <atomic> |
|
#include <thread> // NOLINT(build/c++11) |
|
#include <tuple> |
|
#include <vector> |
|
|
|
#include "gtest/gtest.h" |
|
#include "absl/base/internal/sysinfo.h" |
|
#include "absl/container/fixed_array.h" |
|
#include "absl/time/clock.h" |
|
|
|
namespace { |
|
|
|
namespace flags = absl::flags_internal; |
|
|
|
class ConcurrentSequenceLockTest |
|
: public testing::TestWithParam<std::tuple<int, int>> { |
|
public: |
|
ConcurrentSequenceLockTest() |
|
: buf_bytes_(std::get<0>(GetParam())), |
|
num_threads_(std::get<1>(GetParam())) {} |
|
|
|
protected: |
|
const int buf_bytes_; |
|
const int num_threads_; |
|
}; |
|
|
|
TEST_P(ConcurrentSequenceLockTest, ReadAndWrite) { |
|
const int buf_words = |
|
flags::AlignUp(buf_bytes_, sizeof(uint64_t)) / sizeof(uint64_t); |
|
|
|
// The buffer that will be protected by the SequenceLock. |
|
absl::FixedArray<std::atomic<uint64_t>> protected_buf(buf_words); |
|
for (auto& v : protected_buf) v = -1; |
|
|
|
flags::SequenceLock seq_lock; |
|
std::atomic<bool> stop{false}; |
|
std::atomic<int64_t> bad_reads{0}; |
|
std::atomic<int64_t> good_reads{0}; |
|
std::atomic<int64_t> unsuccessful_reads{0}; |
|
|
|
// Start a bunch of threads which read 'protected_buf' under the sequence |
|
// lock. The main thread will concurrently update 'protected_buf'. The updates |
|
// always consist of an array of identical integers. The reader ensures that |
|
// any data it reads matches that pattern (i.e. the reads are not "torn"). |
|
std::vector<std::thread> threads; |
|
for (int i = 0; i < num_threads_; i++) { |
|
threads.emplace_back([&]() { |
|
absl::FixedArray<char> local_buf(buf_bytes_); |
|
while (!stop.load(std::memory_order_relaxed)) { |
|
if (seq_lock.TryRead(local_buf.data(), protected_buf.data(), |
|
buf_bytes_)) { |
|
bool good = true; |
|
for (const auto& v : local_buf) { |
|
if (v != local_buf[0]) good = false; |
|
} |
|
if (good) { |
|
good_reads.fetch_add(1, std::memory_order_relaxed); |
|
} else { |
|
bad_reads.fetch_add(1, std::memory_order_relaxed); |
|
} |
|
} else { |
|
unsuccessful_reads.fetch_add(1, std::memory_order_relaxed); |
|
} |
|
} |
|
}); |
|
} |
|
while (unsuccessful_reads.load(std::memory_order_relaxed) < num_threads_) { |
|
absl::SleepFor(absl::Milliseconds(1)); |
|
} |
|
seq_lock.MarkInitialized(); |
|
|
|
// Run a maximum of 5 seconds. On Windows, the scheduler behavior seems |
|
// somewhat unfair and without an explicit timeout for this loop, the tests |
|
// can run a long time. |
|
absl::Time deadline = absl::Now() + absl::Seconds(5); |
|
for (int i = 0; i < 100 && absl::Now() < deadline; i++) { |
|
absl::FixedArray<char> writer_buf(buf_bytes_); |
|
for (auto& v : writer_buf) v = i; |
|
seq_lock.Write(protected_buf.data(), writer_buf.data(), buf_bytes_); |
|
absl::SleepFor(absl::Microseconds(10)); |
|
} |
|
stop.store(true, std::memory_order_relaxed); |
|
for (auto& t : threads) t.join(); |
|
ASSERT_GE(good_reads, 0); |
|
ASSERT_EQ(bad_reads, 0); |
|
} |
|
|
|
// Simple helper for generating a range of thread counts. |
|
// Generates [low, low*scale, low*scale^2, ...high) |
|
// (even if high is between low*scale^k and low*scale^(k+1)). |
|
std::vector<int> MultiplicativeRange(int low, int high, int scale) { |
|
std::vector<int> result; |
|
for (int current = low; current < high; current *= scale) { |
|
result.push_back(current); |
|
} |
|
result.push_back(high); |
|
return result; |
|
} |
|
|
|
#ifndef ABSL_HAVE_THREAD_SANITIZER |
|
const int kMaxThreads = absl::base_internal::NumCPUs(); |
|
#else |
|
// With TSAN, a lot of threads contending for atomic access on the sequence |
|
// lock make this test run too slowly. |
|
const int kMaxThreads = std::min(absl::base_internal::NumCPUs(), 4); |
|
#endif |
|
|
|
INSTANTIATE_TEST_SUITE_P( |
|
TestManyByteSizes, ConcurrentSequenceLockTest, |
|
testing::Combine( |
|
// Buffer size (bytes). |
|
testing::Range(1, 128), |
|
// Number of reader threads. |
|
testing::ValuesIn(MultiplicativeRange(1, kMaxThreads, 2)))); |
|
|
|
// Simple single-threaded test, parameterized by the size of the buffer to be |
|
// protected. |
|
class SequenceLockTest : public testing::TestWithParam<int> {}; |
|
|
|
TEST_P(SequenceLockTest, SingleThreaded) { |
|
const int size = GetParam(); |
|
absl::FixedArray<std::atomic<uint64_t>> protected_buf( |
|
flags::AlignUp(size, sizeof(uint64_t)) / sizeof(uint64_t)); |
|
|
|
flags::SequenceLock seq_lock; |
|
seq_lock.MarkInitialized(); |
|
|
|
std::vector<char> src_buf(size, 'x'); |
|
seq_lock.Write(protected_buf.data(), src_buf.data(), size); |
|
|
|
std::vector<char> dst_buf(size, '0'); |
|
ASSERT_TRUE(seq_lock.TryRead(dst_buf.data(), protected_buf.data(), size)); |
|
ASSERT_EQ(src_buf, dst_buf); |
|
} |
|
INSTANTIATE_TEST_SUITE_P(TestManyByteSizes, SequenceLockTest, |
|
// Buffer size (bytes). |
|
testing::Range(1, 128)); |
|
|
|
} // namespace
|
|
|