0

bazel我在单元测试中有以下测试代码:

 TEST(threading, record_integers) {
    ThreadWorkerPool pool;
    int num_threads = 3;
    int num_tasks = 7;
    pool.add_threads(num_threads);
    std::vector<std::future<void>> futures;
    std::vector<int> answers(num_tasks);
    for (size_t i = 0; i < num_tasks; ++i) {
      futures.emplace_back(pool.submit(
          [i, &answers]() {
            answers[i] = i;
          }));
    }
    for (auto &f : futures) {
      f.get();
    }
    for (size_t i = 0; i < answers.size(); ++i) {
      EXPECT_EQ(answers[i], i);
    }
  }

该代码使用此处定义并此处实现的线程库。

图书馆有足够的道路里程,我希望它是正确的。但是,当我编译并运行我的测试程序时(在 clang 下)

clang version 10.0.0-4ubuntu1 
Target: x86_64-pc-linux-gnu
Thread model: posix
InstalledDir: /usr/bin

我从 TSAN 得到以下输出:

CC=clang bazel test --config=tsan cpputil/test:thread_tools_test 
INFO: Analyzed target //cpputil/test:thread_tools_test (26 packages loaded, 1741 targets configured).
INFO: Found 1 test target...
FAIL: //cpputil/test:thread_tools_test (see /home/steve/.cache/bazel/_bazel_steve/f50d9463373c4a4449e39ff0ce529ba9/execroot/__main__/bazel-out/k8-fastbuild/testlogs/cpputil/test/thread_tools_test/test.log)
INFO: From Testing //cpputil/test:thread_tools_test:
==================== Test output for //cpputil/test:thread_tools_test:
Running main() from gmock_main.cc
[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from threading
[ RUN      ] threading.record_integers
ThreadSanitizer:DEADLYSIGNAL
==12==ERROR: ThreadSanitizer: SEGV on unknown address 0x000000000000 (pc 0x7f682eddc6d0 bp 0x7f682c4bf0d0 sp 0x7f682c4bf0b8 T18)
==12==The signal is caused by a WRITE memory access.
==12==Hint: address points to the zero page.
ThreadSanitizer:DEADLYSIGNAL
ThreadSanitizer:DEADLYSIGNAL
    #0 __tsan::FuncEntry(__tsan::ThreadState*, unsigned long) ../../../../src/libsanitizer/tsan/tsan_rtl.cpp:1025 (libtsan.so.0+0x9a6d0)
    #1 __tsan_func_entry ../../../../src/libsanitizer/tsan/tsan_interface_inl.h:103 (libtsan.so.0+0x9a6d0)
    #2 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (BOOM::ThreadWorkerPool::*)(), BOOM::ThreadWorkerPool*> > >::_M_run() <null> (liblibboom.so+0xd734a1)
    #3 <null> <null> (libstdc++.so.6+0xd6de3)
    #4 start_thread /build/glibc-eX1tMB/glibc-2.31/nptl/pthread_create.c:477 (libpthread.so.0+0x9608)
    #5 __clone <null> (libc.so.6+0x122292)

ThreadSanitizer can not provide additional info.
SUMMARY: ThreadSanitizer: SEGV ../../../../src/libsanitizer/tsan/tsan_rtl.cpp:1025 in __tsan::FuncEntry(__tsan::ThreadState*, unsigned long)
==12==ABORTING
================================================================================
Target //cpputil/test:thread_tools_test up-to-date:
  bazel-bin/cpputil/test/thread_tools_test

这看起来非常相似,但可能与报告的 TSAN 错误(例如这个)不同。C++ 线程专家(或 TSAN 专家)可以以一种或另一种方式验证吗?

编辑: ThreadTools.hpp 的代码:

#ifndef BOOM_CPPUTIL_THREAD_TOOLS_HPP_
#define BOOM_CPPUTIL_THREAD_TOOLS_HPP_

// Copyright 2018 Google LLC. All Rights Reserved.
/*
  Copyright (C) 2005-2016 Steven L. Scott

  This library is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  This library is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with this library; if not, write to the Free Software
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA
*/

#include <chrono>
#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>

// The main object defined here is the ThreadWorkerPool.  Before defining that
// object, we must first define some building blocks.

namespace BOOM {

  // A std::vector<std::thread> that calls join() on all joinable
  // threads when it goes out of scope.
  class ThreadVector : public std::vector<std::thread> {
    typedef std::vector<std::thread> ParentType;

   public:
    ThreadVector() = default;
    ~ThreadVector() { join_threads(); }

    void clear() {
      join_threads();
      ParentType::clear();
    }

    void join_threads() {
      ParentType &threads(*this);
      for (size_t i = 0; i < threads.size(); ++i) {
        if (threads[i].joinable()) {
          threads[i].join();
        }
      }
    }
  };

  //======================================================================
  // A move-only function wrapper.
  class MoveOnlyTaskWrapper {
   public:
    // An empty task.
    MoveOnlyTaskWrapper() = default;

    // Construct a task from a function like object with a void(void)
    // signature.
    //
    // cppcheck-suppress noExplicitConstructor
    template <typename F> MoveOnlyTaskWrapper(F &&f)  // NOLINT
        : impl_(new ConcreteFunctor<F>(std::move(f))) {}

    // Move constructor
    MoveOnlyTaskWrapper(MoveOnlyTaskWrapper &&other)
        : impl_(std::move(other.impl_)) {}

    // Move-only assignment operator.
    MoveOnlyTaskWrapper &operator=(MoveOnlyTaskWrapper &&other) {
      impl_ = std::move(other.impl_);
      return *this;
    }

    // Delete copy constructors and the traditional assignment
    // operator.
    MoveOnlyTaskWrapper(const MoveOnlyTaskWrapper &rhs) = delete;
    MoveOnlyTaskWrapper(MoveOnlyTaskWrapper &rhs) = delete;
    MoveOnlyTaskWrapper &operator=(const MoveOnlyTaskWrapper &rhs) = delete;

    // Invoke the wrapped function.
    void operator()() { impl_->call(); }

   private:
    // A base class that can be stored in a pointer, supplying an
    // interface to call().
    struct FunctorInterface {
      virtual void call() = 0;
      virtual ~FunctorInterface() {}
    };

    // A concrete derived class that can hold functors of various
    // types.
    template <typename F>
    struct ConcreteFunctor : public FunctorInterface {
      F f;
      // cppcheck-suppress noExplicitConstructor
      ConcreteFunctor(F &&f_) : f(std::move(f_)) {}  // NOLINT
      void call() override { f(); }
    };

    // A pointer to store the functor.
    std::unique_ptr<FunctorInterface> impl_;
  };

  //======================================================================
  // A queue for passing objects between threads.  All operations are
  // thread safe.
  class ThreadSafeTaskQueue {
   public:
    // Pushes a task onto the queue.
    void push(MoveOnlyTaskWrapper &&task);

    // Try to pop the front of the queue into the first argument.
    // Args:
    //   task: If there is work to do in the queue, the waiting task
    //     is moved into the 'task' argument.
    //   timeout: The maximum amount of time a thread will spend
    //     waiting for new work before yielding to the next thread.
    //     If there is no work to do for a long time threads will keep
    //     yielding to one another.  This argument is here to keep
    //     threads from waiting forever, so that a global 'done' flag
    //     can be set, allowing threads to exit gracefully.
    //
    // Returns:
    //   The return value is true if a task was successfully placed in
    //   the first argument (i.e. work was found on the queue).  It is
    //   false if there was no available task in the alotted time.
    bool wait_and_pop(
        MoveOnlyTaskWrapper &task,
        std::chrono::milliseconds timeout = std::chrono::milliseconds(100));

    // Returns true if the queue is empty, false otherwise.
    bool empty() const;

   private:
    mutable std::mutex task_queue_mutex_;
    std::condition_variable new_work_;
    std::queue<MoveOnlyTaskWrapper> task_queue_;
  };

  //======================================================================
  // Manages a collection of threads and a ThreadSafeTaskQueue for
  // passing work to them.
  //
  // The idiom for using this is:
  //
  // ThreadWorkerPool pool;
  // pool.add_threads(10);  // consider std::hardware_concurrency()
  // std::vector<future<void>> futures;
  // for (int i = 0; i < 7; ++i) {
  //   futures.emplace_back(pool.submit([](){do_some_work();}));
  // }
  // for (int i = 0; i < futures.size(); ++i) {
  //   futures[i].get();
  // }
  //
  // Note that the call to futures[i].get() passes any exceptions
  // encountered by worker threads back to the calling thread.
  class ThreadWorkerPool {
   public:
    // Start a worker pool with the given number of threads.
    explicit ThreadWorkerPool(int number_of_threads = 0);

    // Shuts down waiting threads.
    ~ThreadWorkerPool();

    // Add the specified number of threads to the pool.
    void add_threads(int number_of_additional_threads);

    // Sets the number of threads in the pool to the given value.  If
    // the pool currently contains this many or more joinable threads
    // then nothing is done.  If (number_of_threads <= 0) then all
    // threads are joined and destroyed.  If more threads are needed
    // then they will be added.
    void set_number_of_threads(int number_of_threads);

    // Submit a job to the pool.
    // Args:
    //   task: A function-like object with signature void(void),
    //     representing an item of work to be done by a waiting
    //     thread.
    //
    // Returns:
    //   The return value is a future.  Calling wait() on the return
    //   value will pause the current thread until the job on the
    //   remote thread completes, or an exception is thrown.  If an
    //   exception is thrown by the remote thread then wait() passes
    //   it to the current thread.
    template <typename FunctionType>
    std::future<void> submit(FunctionType work) {
      std::packaged_task<void()> task(std::move(work));
      std::future<void> res(task.get_future());
      work_queue_.push(std::move(task));
      return res;
    }

    // Returns true() if there are currently no threads available to
    // do work.  Worker threads can be added by calling add_threads().
    bool no_threads() const { return threads_.empty(); }

    int number_of_threads() const { return threads_.size(); }

    int number_of_joinable_threads() const {
      int ans = 0;
      for (int i = 0; i < threads_.size(); ++i) {
        ans += threads_[i].joinable();
      }
      return ans;
    }

   private:
    // A flag indicating that worker threads should shut down.
    std::atomic_bool done_;

    // A queue for passing work to worker threads.
    ThreadSafeTaskQueue work_queue_;

    // The collection of worker threads.
    ThreadVector threads_;

    // A thread to run in the background.  Continually checks to see
    // if there is work in the queue.  If it finds a task then do it,
    // otherwise yield to the next thread.
    void worker_thread();
  };

}  // namespace BOOM

#endif  //  BOOM_CPPUTIL_THREAD_TOOLS_HPP_

和 ThreadTools.cpp:

// Copyright 2018 Google LLC. All Rights Reserved.
/*
  Copyright (C) 2005-2016 Steven L. Scott

  This library is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  This library is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with this library; if not, write to the Free Software
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA
*/

#include "cpputil/ThreadTools.hpp"

namespace BOOM {

  void ThreadSafeTaskQueue::push(MoveOnlyTaskWrapper &&task) {
    std::lock_guard<std::mutex> task_queue_lock(task_queue_mutex_);
    new_work_.notify_one();
    task_queue_.emplace(std::move(task));
  }

  bool ThreadSafeTaskQueue::wait_and_pop(MoveOnlyTaskWrapper &task,
                                         std::chrono::milliseconds timeout) {
    std::unique_lock<std::mutex> lock(task_queue_mutex_);
    new_work_.wait_for(lock, timeout,
                       [this]() { return !task_queue_.empty(); });
    if (!task_queue_.empty()) {
      task = std::move(task_queue_.front());
      task_queue_.pop();
      return true;
    } else {
      return false;
    }
  }

  bool ThreadSafeTaskQueue::empty() const {
    std::lock_guard<std::mutex> lock(task_queue_mutex_);
    return task_queue_.empty();
  }

  ThreadWorkerPool::ThreadWorkerPool(int number_of_threads) : done_(false) {
    if (number_of_threads > 0) {
      add_threads(number_of_threads);
    }
  }

  ThreadWorkerPool::~ThreadWorkerPool() { done_ = true; }

  void ThreadWorkerPool::add_threads(int number_of_threads) {
    try {
      for (int i = 0; i < number_of_threads; ++i) {
        threads_.push_back(std::thread(&ThreadWorkerPool::worker_thread, this));
      }
    } catch (...) {
      done_ = true;
      throw;
    }
  }

  void ThreadWorkerPool::set_number_of_threads(int n) {
    if (n <= 0) {
      done_ = true;
      threads_.clear();
      return;
    } else {
      int current_number_of_joinable_threads = 0;
      done_ = false;
      for (int i = 0; i < threads_.size(); ++i) {
        current_number_of_joinable_threads += threads_[i].joinable();
      }
      if (current_number_of_joinable_threads < n) {
        add_threads(n - current_number_of_joinable_threads);
      }
    }
  }

  void ThreadWorkerPool::worker_thread() {
    while (!done_) {
      MoveOnlyTaskWrapper task;
      if (work_queue_.wait_and_pop(task)) {
        task();
      } else {
        std::this_thread::yield();
      }
    }
  }

}  // namespace BOOM
4

0 回答 0