bazel
我在单元测试中有以下测试代码:
TEST(threading, record_integers) {
ThreadWorkerPool pool;
int num_threads = 3;
int num_tasks = 7;
pool.add_threads(num_threads);
std::vector<std::future<void>> futures;
std::vector<int> answers(num_tasks);
for (size_t i = 0; i < num_tasks; ++i) {
futures.emplace_back(pool.submit(
[i, &answers]() {
answers[i] = i;
}));
}
for (auto &f : futures) {
f.get();
}
for (size_t i = 0; i < answers.size(); ++i) {
EXPECT_EQ(answers[i], i);
}
}
图书馆有足够的道路里程,我希望它是正确的。但是,当我编译并运行我的测试程序时(在 clang 下)
clang version 10.0.0-4ubuntu1
Target: x86_64-pc-linux-gnu
Thread model: posix
InstalledDir: /usr/bin
我从 TSAN 得到以下输出:
CC=clang bazel test --config=tsan cpputil/test:thread_tools_test
INFO: Analyzed target //cpputil/test:thread_tools_test (26 packages loaded, 1741 targets configured).
INFO: Found 1 test target...
FAIL: //cpputil/test:thread_tools_test (see /home/steve/.cache/bazel/_bazel_steve/f50d9463373c4a4449e39ff0ce529ba9/execroot/__main__/bazel-out/k8-fastbuild/testlogs/cpputil/test/thread_tools_test/test.log)
INFO: From Testing //cpputil/test:thread_tools_test:
==================== Test output for //cpputil/test:thread_tools_test:
Running main() from gmock_main.cc
[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from threading
[ RUN ] threading.record_integers
ThreadSanitizer:DEADLYSIGNAL
==12==ERROR: ThreadSanitizer: SEGV on unknown address 0x000000000000 (pc 0x7f682eddc6d0 bp 0x7f682c4bf0d0 sp 0x7f682c4bf0b8 T18)
==12==The signal is caused by a WRITE memory access.
==12==Hint: address points to the zero page.
ThreadSanitizer:DEADLYSIGNAL
ThreadSanitizer:DEADLYSIGNAL
#0 __tsan::FuncEntry(__tsan::ThreadState*, unsigned long) ../../../../src/libsanitizer/tsan/tsan_rtl.cpp:1025 (libtsan.so.0+0x9a6d0)
#1 __tsan_func_entry ../../../../src/libsanitizer/tsan/tsan_interface_inl.h:103 (libtsan.so.0+0x9a6d0)
#2 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (BOOM::ThreadWorkerPool::*)(), BOOM::ThreadWorkerPool*> > >::_M_run() <null> (liblibboom.so+0xd734a1)
#3 <null> <null> (libstdc++.so.6+0xd6de3)
#4 start_thread /build/glibc-eX1tMB/glibc-2.31/nptl/pthread_create.c:477 (libpthread.so.0+0x9608)
#5 __clone <null> (libc.so.6+0x122292)
ThreadSanitizer can not provide additional info.
SUMMARY: ThreadSanitizer: SEGV ../../../../src/libsanitizer/tsan/tsan_rtl.cpp:1025 in __tsan::FuncEntry(__tsan::ThreadState*, unsigned long)
==12==ABORTING
================================================================================
Target //cpputil/test:thread_tools_test up-to-date:
bazel-bin/cpputil/test/thread_tools_test
这看起来非常相似,但可能与报告的 TSAN 错误(例如这个)不同。C++ 线程专家(或 TSAN 专家)可以以一种或另一种方式验证吗?
编辑: ThreadTools.hpp 的代码:
#ifndef BOOM_CPPUTIL_THREAD_TOOLS_HPP_
#define BOOM_CPPUTIL_THREAD_TOOLS_HPP_
// Copyright 2018 Google LLC. All Rights Reserved.
/*
Copyright (C) 2005-2016 Steven L. Scott
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/
#include <chrono>
#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
// The main object defined here is the ThreadWorkerPool. Before defining that
// object, we must first define some building blocks.
namespace BOOM {
// A std::vector<std::thread> that calls join() on all joinable
// threads when it goes out of scope.
class ThreadVector : public std::vector<std::thread> {
typedef std::vector<std::thread> ParentType;
public:
ThreadVector() = default;
~ThreadVector() { join_threads(); }
void clear() {
join_threads();
ParentType::clear();
}
void join_threads() {
ParentType &threads(*this);
for (size_t i = 0; i < threads.size(); ++i) {
if (threads[i].joinable()) {
threads[i].join();
}
}
}
};
//======================================================================
// A move-only function wrapper.
class MoveOnlyTaskWrapper {
public:
// An empty task.
MoveOnlyTaskWrapper() = default;
// Construct a task from a function like object with a void(void)
// signature.
//
// cppcheck-suppress noExplicitConstructor
template <typename F> MoveOnlyTaskWrapper(F &&f) // NOLINT
: impl_(new ConcreteFunctor<F>(std::move(f))) {}
// Move constructor
MoveOnlyTaskWrapper(MoveOnlyTaskWrapper &&other)
: impl_(std::move(other.impl_)) {}
// Move-only assignment operator.
MoveOnlyTaskWrapper &operator=(MoveOnlyTaskWrapper &&other) {
impl_ = std::move(other.impl_);
return *this;
}
// Delete copy constructors and the traditional assignment
// operator.
MoveOnlyTaskWrapper(const MoveOnlyTaskWrapper &rhs) = delete;
MoveOnlyTaskWrapper(MoveOnlyTaskWrapper &rhs) = delete;
MoveOnlyTaskWrapper &operator=(const MoveOnlyTaskWrapper &rhs) = delete;
// Invoke the wrapped function.
void operator()() { impl_->call(); }
private:
// A base class that can be stored in a pointer, supplying an
// interface to call().
struct FunctorInterface {
virtual void call() = 0;
virtual ~FunctorInterface() {}
};
// A concrete derived class that can hold functors of various
// types.
template <typename F>
struct ConcreteFunctor : public FunctorInterface {
F f;
// cppcheck-suppress noExplicitConstructor
ConcreteFunctor(F &&f_) : f(std::move(f_)) {} // NOLINT
void call() override { f(); }
};
// A pointer to store the functor.
std::unique_ptr<FunctorInterface> impl_;
};
//======================================================================
// A queue for passing objects between threads. All operations are
// thread safe.
class ThreadSafeTaskQueue {
public:
// Pushes a task onto the queue.
void push(MoveOnlyTaskWrapper &&task);
// Try to pop the front of the queue into the first argument.
// Args:
// task: If there is work to do in the queue, the waiting task
// is moved into the 'task' argument.
// timeout: The maximum amount of time a thread will spend
// waiting for new work before yielding to the next thread.
// If there is no work to do for a long time threads will keep
// yielding to one another. This argument is here to keep
// threads from waiting forever, so that a global 'done' flag
// can be set, allowing threads to exit gracefully.
//
// Returns:
// The return value is true if a task was successfully placed in
// the first argument (i.e. work was found on the queue). It is
// false if there was no available task in the alotted time.
bool wait_and_pop(
MoveOnlyTaskWrapper &task,
std::chrono::milliseconds timeout = std::chrono::milliseconds(100));
// Returns true if the queue is empty, false otherwise.
bool empty() const;
private:
mutable std::mutex task_queue_mutex_;
std::condition_variable new_work_;
std::queue<MoveOnlyTaskWrapper> task_queue_;
};
//======================================================================
// Manages a collection of threads and a ThreadSafeTaskQueue for
// passing work to them.
//
// The idiom for using this is:
//
// ThreadWorkerPool pool;
// pool.add_threads(10); // consider std::hardware_concurrency()
// std::vector<future<void>> futures;
// for (int i = 0; i < 7; ++i) {
// futures.emplace_back(pool.submit([](){do_some_work();}));
// }
// for (int i = 0; i < futures.size(); ++i) {
// futures[i].get();
// }
//
// Note that the call to futures[i].get() passes any exceptions
// encountered by worker threads back to the calling thread.
class ThreadWorkerPool {
public:
// Start a worker pool with the given number of threads.
explicit ThreadWorkerPool(int number_of_threads = 0);
// Shuts down waiting threads.
~ThreadWorkerPool();
// Add the specified number of threads to the pool.
void add_threads(int number_of_additional_threads);
// Sets the number of threads in the pool to the given value. If
// the pool currently contains this many or more joinable threads
// then nothing is done. If (number_of_threads <= 0) then all
// threads are joined and destroyed. If more threads are needed
// then they will be added.
void set_number_of_threads(int number_of_threads);
// Submit a job to the pool.
// Args:
// task: A function-like object with signature void(void),
// representing an item of work to be done by a waiting
// thread.
//
// Returns:
// The return value is a future. Calling wait() on the return
// value will pause the current thread until the job on the
// remote thread completes, or an exception is thrown. If an
// exception is thrown by the remote thread then wait() passes
// it to the current thread.
template <typename FunctionType>
std::future<void> submit(FunctionType work) {
std::packaged_task<void()> task(std::move(work));
std::future<void> res(task.get_future());
work_queue_.push(std::move(task));
return res;
}
// Returns true() if there are currently no threads available to
// do work. Worker threads can be added by calling add_threads().
bool no_threads() const { return threads_.empty(); }
int number_of_threads() const { return threads_.size(); }
int number_of_joinable_threads() const {
int ans = 0;
for (int i = 0; i < threads_.size(); ++i) {
ans += threads_[i].joinable();
}
return ans;
}
private:
// A flag indicating that worker threads should shut down.
std::atomic_bool done_;
// A queue for passing work to worker threads.
ThreadSafeTaskQueue work_queue_;
// The collection of worker threads.
ThreadVector threads_;
// A thread to run in the background. Continually checks to see
// if there is work in the queue. If it finds a task then do it,
// otherwise yield to the next thread.
void worker_thread();
};
} // namespace BOOM
#endif // BOOM_CPPUTIL_THREAD_TOOLS_HPP_
和 ThreadTools.cpp:
// Copyright 2018 Google LLC. All Rights Reserved.
/*
Copyright (C) 2005-2016 Steven L. Scott
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/
#include "cpputil/ThreadTools.hpp"
namespace BOOM {
void ThreadSafeTaskQueue::push(MoveOnlyTaskWrapper &&task) {
std::lock_guard<std::mutex> task_queue_lock(task_queue_mutex_);
new_work_.notify_one();
task_queue_.emplace(std::move(task));
}
bool ThreadSafeTaskQueue::wait_and_pop(MoveOnlyTaskWrapper &task,
std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(task_queue_mutex_);
new_work_.wait_for(lock, timeout,
[this]() { return !task_queue_.empty(); });
if (!task_queue_.empty()) {
task = std::move(task_queue_.front());
task_queue_.pop();
return true;
} else {
return false;
}
}
bool ThreadSafeTaskQueue::empty() const {
std::lock_guard<std::mutex> lock(task_queue_mutex_);
return task_queue_.empty();
}
ThreadWorkerPool::ThreadWorkerPool(int number_of_threads) : done_(false) {
if (number_of_threads > 0) {
add_threads(number_of_threads);
}
}
ThreadWorkerPool::~ThreadWorkerPool() { done_ = true; }
void ThreadWorkerPool::add_threads(int number_of_threads) {
try {
for (int i = 0; i < number_of_threads; ++i) {
threads_.push_back(std::thread(&ThreadWorkerPool::worker_thread, this));
}
} catch (...) {
done_ = true;
throw;
}
}
void ThreadWorkerPool::set_number_of_threads(int n) {
if (n <= 0) {
done_ = true;
threads_.clear();
return;
} else {
int current_number_of_joinable_threads = 0;
done_ = false;
for (int i = 0; i < threads_.size(); ++i) {
current_number_of_joinable_threads += threads_[i].joinable();
}
if (current_number_of_joinable_threads < n) {
add_threads(n - current_number_of_joinable_threads);
}
}
}
void ThreadWorkerPool::worker_thread() {
while (!done_) {
MoveOnlyTaskWrapper task;
if (work_queue_.wait_and_pop(task)) {
task();
} else {
std::this_thread::yield();
}
}
}
} // namespace BOOM