1

I am at wit's end with a simple task group class I wrote in C++11 and which is crashing in a heisenbug fashion (typically with a memory access error under heavy system load and - presumably - heavy thread scheduling pressure). First the code:

class task_group
{
    typedef std::promise<void> p_type;
    typedef std::future<void> f_type;
    typedef std::pair<p_type,f_type> pair_type;
    typedef std::forward_list<pair_type> container_type;
    template <typename Callable>
    struct task_wrapper
    {
        template <typename T>
        explicit task_wrapper(T &&c, pair_type &pair):m_c(std::forward<T>(c)),m_pair(pair)
        {
            m_pair.second = m_pair.first.get_future();
        }
        void operator()()
        {
            try {
                m_c();
                m_pair.first.set_value();
            } catch (...) {
                m_pair.first.set_exception(std::current_exception());
            }
        }
        Callable    m_c;
        pair_type   &m_pair;
    };
public:
    task_group() = default;
    ~task_group()
    {
        wait_all();
    }
    task_group(const task_group &) = delete;
    task_group(task_group &&) = delete;
    task_group &operator=(const task_group &) = delete;
    task_group &operator=(task_group &&) = delete;
    template <typename Callable>
    void add_task(Callable &&c)
    {
        m_container.emplace_front(p_type{},f_type{});
        task_wrapper<typename std::decay<Callable>::type> tw(std::forward<Callable>(c),m_container.front());
        std::thread thr(std::move(tw));
        thr.detach();
    }
    void wait_all()
    {
        std::for_each(m_container.begin(),m_container.end(),[this](pair_type &p) {
            if (p.second.valid()) {
                p.second.wait();
            }
        });
    }
    void get_all()
    {
        std::for_each(m_container.begin(),m_container.end(),[this](pair_type &p) {
            if (p.second.valid()) {
                p.second.get();
            }
        });
    }
private:
    container_type m_container;
};

Tasks (represented by a nullary callable object) can be added via the add_task() method, which launches the callable in a separate thread that gets immediately detached. The callable is actually wrapped in a wrapper structure that just takes care of setting the value of the promise once the callable has finished its work. Promises and futures are stored independently of the thread's lifetime in a persistent forward_list.

I am not doing anything funky with the class, just create it, add tasks and then wait for completion. A stupid example (from the unit test):

task_group tg;
std::vector<unsigned> values(100u);
for (auto i = 0u; i < 100u; ++i) {
    auto s = boost::lexical_cast<std::string>(1u);
    BOOST_CHECK_NO_THROW(tg.add_task([&values,s,i](){values[i] += boost::lexical_cast<unsigned>(s);}));
}
tg.wait_all();
BOOST_CHECK((std::all_of(values.begin(),values.end(),[](unsigned n){return n == 1u;})));

I have tried to run the test case through valgrind but all I seem to get are false positives regarding shared pointers reference counting. The class is so simple that I am convinced I am doing something really stupid, but a lot of googling about promises and futures did not hint at any major misuse on my side.

4

0 回答 0