170 lines
5.7 KiB
C++
170 lines
5.7 KiB
C++
|
// Copyright 2020 The Abseil Authors.
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// https://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
#include "absl/flags/internal/sequence_lock.h"
|
||
|
|
||
|
#include <algorithm>
|
||
|
#include <atomic>
|
||
|
#include <thread> // NOLINT(build/c++11)
|
||
|
#include <tuple>
|
||
|
#include <vector>
|
||
|
|
||
|
#include "gtest/gtest.h"
|
||
|
#include "absl/base/internal/sysinfo.h"
|
||
|
#include "absl/container/fixed_array.h"
|
||
|
#include "absl/time/clock.h"
|
||
|
|
||
|
namespace {
|
||
|
|
||
|
namespace flags = absl::flags_internal;
|
||
|
|
||
|
class ConcurrentSequenceLockTest
|
||
|
: public testing::TestWithParam<std::tuple<int, int>> {
|
||
|
public:
|
||
|
ConcurrentSequenceLockTest()
|
||
|
: buf_bytes_(std::get<0>(GetParam())),
|
||
|
num_threads_(std::get<1>(GetParam())) {}
|
||
|
|
||
|
protected:
|
||
|
const int buf_bytes_;
|
||
|
const int num_threads_;
|
||
|
};
|
||
|
|
||
|
TEST_P(ConcurrentSequenceLockTest, ReadAndWrite) {
|
||
|
const int buf_words =
|
||
|
flags::AlignUp(buf_bytes_, sizeof(uint64_t)) / sizeof(uint64_t);
|
||
|
|
||
|
// The buffer that will be protected by the SequenceLock.
|
||
|
absl::FixedArray<std::atomic<uint64_t>> protected_buf(buf_words);
|
||
|
for (auto& v : protected_buf) v = -1;
|
||
|
|
||
|
flags::SequenceLock seq_lock;
|
||
|
std::atomic<bool> stop{false};
|
||
|
std::atomic<int64_t> bad_reads{0};
|
||
|
std::atomic<int64_t> good_reads{0};
|
||
|
std::atomic<int64_t> unsuccessful_reads{0};
|
||
|
|
||
|
// Start a bunch of threads which read 'protected_buf' under the sequence
|
||
|
// lock. The main thread will concurrently update 'protected_buf'. The updates
|
||
|
// always consist of an array of identical integers. The reader ensures that
|
||
|
// any data it reads matches that pattern (i.e. the reads are not "torn").
|
||
|
std::vector<std::thread> threads;
|
||
|
for (int i = 0; i < num_threads_; i++) {
|
||
|
threads.emplace_back([&]() {
|
||
|
absl::FixedArray<char> local_buf(buf_bytes_);
|
||
|
while (!stop.load(std::memory_order_relaxed)) {
|
||
|
if (seq_lock.TryRead(local_buf.data(), protected_buf.data(),
|
||
|
buf_bytes_)) {
|
||
|
bool good = true;
|
||
|
for (const auto& v : local_buf) {
|
||
|
if (v != local_buf[0]) good = false;
|
||
|
}
|
||
|
if (good) {
|
||
|
good_reads.fetch_add(1, std::memory_order_relaxed);
|
||
|
} else {
|
||
|
bad_reads.fetch_add(1, std::memory_order_relaxed);
|
||
|
}
|
||
|
} else {
|
||
|
unsuccessful_reads.fetch_add(1, std::memory_order_relaxed);
|
||
|
}
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
while (unsuccessful_reads.load(std::memory_order_relaxed) < num_threads_) {
|
||
|
absl::SleepFor(absl::Milliseconds(1));
|
||
|
}
|
||
|
seq_lock.MarkInitialized();
|
||
|
|
||
|
// Run a maximum of 5 seconds. On Windows, the scheduler behavior seems
|
||
|
// somewhat unfair and without an explicit timeout for this loop, the tests
|
||
|
// can run a long time.
|
||
|
absl::Time deadline = absl::Now() + absl::Seconds(5);
|
||
|
for (int i = 0; i < 100 && absl::Now() < deadline; i++) {
|
||
|
absl::FixedArray<char> writer_buf(buf_bytes_);
|
||
|
for (auto& v : writer_buf) v = i;
|
||
|
seq_lock.Write(protected_buf.data(), writer_buf.data(), buf_bytes_);
|
||
|
absl::SleepFor(absl::Microseconds(10));
|
||
|
}
|
||
|
stop.store(true, std::memory_order_relaxed);
|
||
|
for (auto& t : threads) t.join();
|
||
|
ASSERT_GE(good_reads, 0);
|
||
|
ASSERT_EQ(bad_reads, 0);
|
||
|
}
|
||
|
|
||
|
// Simple helper for generating a range of thread counts.
|
||
|
// Generates [low, low*scale, low*scale^2, ...high)
|
||
|
// (even if high is between low*scale^k and low*scale^(k+1)).
|
||
|
std::vector<int> MultiplicativeRange(int low, int high, int scale) {
|
||
|
std::vector<int> result;
|
||
|
for (int current = low; current < high; current *= scale) {
|
||
|
result.push_back(current);
|
||
|
}
|
||
|
result.push_back(high);
|
||
|
return result;
|
||
|
}
|
||
|
|
||
|
#ifndef ABSL_HAVE_THREAD_SANITIZER
|
||
|
const int kMaxThreads = absl::base_internal::NumCPUs();
|
||
|
#else
|
||
|
// With TSAN, a lot of threads contending for atomic access on the sequence
|
||
|
// lock make this test run too slowly.
|
||
|
const int kMaxThreads = std::min(absl::base_internal::NumCPUs(), 4);
|
||
|
#endif
|
||
|
|
||
|
// Return all of the interesting buffer sizes worth testing:
|
||
|
// powers of two and adjacent values.
|
||
|
std::vector<int> InterestingBufferSizes() {
|
||
|
std::vector<int> ret;
|
||
|
for (int v : MultiplicativeRange(1, 128, 2)) {
|
||
|
ret.push_back(v);
|
||
|
if (v > 1) {
|
||
|
ret.push_back(v - 1);
|
||
|
}
|
||
|
ret.push_back(v + 1);
|
||
|
}
|
||
|
return ret;
|
||
|
}
|
||
|
|
||
|
INSTANTIATE_TEST_SUITE_P(
|
||
|
TestManyByteSizes, ConcurrentSequenceLockTest,
|
||
|
testing::Combine(
|
||
|
// Buffer size (bytes).
|
||
|
testing::ValuesIn(InterestingBufferSizes()),
|
||
|
// Number of reader threads.
|
||
|
testing::ValuesIn(MultiplicativeRange(1, kMaxThreads, 2))));
|
||
|
|
||
|
// Simple single-threaded test, parameterized by the size of the buffer to be
|
||
|
// protected.
|
||
|
class SequenceLockTest : public testing::TestWithParam<int> {};
|
||
|
|
||
|
TEST_P(SequenceLockTest, SingleThreaded) {
|
||
|
const int size = GetParam();
|
||
|
absl::FixedArray<std::atomic<uint64_t>> protected_buf(
|
||
|
flags::AlignUp(size, sizeof(uint64_t)) / sizeof(uint64_t));
|
||
|
|
||
|
flags::SequenceLock seq_lock;
|
||
|
seq_lock.MarkInitialized();
|
||
|
|
||
|
std::vector<char> src_buf(size, 'x');
|
||
|
seq_lock.Write(protected_buf.data(), src_buf.data(), size);
|
||
|
|
||
|
std::vector<char> dst_buf(size, '0');
|
||
|
ASSERT_TRUE(seq_lock.TryRead(dst_buf.data(), protected_buf.data(), size));
|
||
|
ASSERT_EQ(src_buf, dst_buf);
|
||
|
}
|
||
|
INSTANTIATE_TEST_SUITE_P(TestManyByteSizes, SequenceLockTest,
|
||
|
// Buffer size (bytes).
|
||
|
testing::Range(1, 128));
|
||
|
|
||
|
} // namespace
|