3

我有以下代码在专用线程上运行函数。除了析构函数外,它工作得很好。调用thread_.join()不返回。我正在使用 VS2013 Express。

我会改变什么以使线程正确连接?

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

namespace
{
    class main_thread
    {
    public:
        static auto instance() -> main_thread&
        {
            static main_thread instance_;
            return instance_;
        }
        auto enque(std::function<void()> func) -> void
        {
            {
                std::lock_guard<std::mutex> lock{ mutex_ };
                queue_.push_back(func);
            }
            condition_.notify_one();
        }
    private:
        main_thread()
        {
            continue_.test_and_set();
            thread_ = std::thread{ std::bind(std::mem_fn(&main_thread::run), this) };
        }
        ~main_thread()
        {
            continue_.clear();
            condition_.notify_all();
            if (thread_.joinable())
            {
                thread_.join();
            }
        }
        main_thread(const main_thread &other) = delete;
        main_thread(main_thread &&other) = delete;
        main_thread& operator=(const main_thread &other) = delete;
        main_thread& operator=(main_thread &&other) = delete;

        auto run() -> void
        {
            while (continue_.test_and_set())
            {
                auto lock = std::unique_lock<std::mutex>{ mutex_ };
                //condition_.wait_for(lock, std::chrono::milliseconds(1));
                condition_.wait(lock);
                for (auto &func : queue_)
                {
                    func();
                }
                queue_.clear();
            }
        }

        std::condition_variable condition_;
        std::mutex mutex_;
        std::vector<std::function<void()>> queue_;
        std::thread thread_;
        std::atomic_flag continue_;
    };
}

auto on_main_thread(std::function<void()> func) -> void
{
    main_thread::instance().enque(std::move(func));
}

auto on_main_thread_sync(std::function<void()> func) -> void
{
    bool done{ false };
    on_main_thread([&]{
        func();
        done = true;
    });
    while (!done);
}

执行此代码的唯一功能是

int main()
{
    on_main_thread([]{});
}

这避免了比赛的问题,on_main_thread_sync但仍然有锁定~main_thread。Visual Studio 表示有 2 个线程,但都不是 in main_thread::run,所以我不明白发生了什么。该函数正确退出,但由于某种原因线程没有结束。

4

2 回答 2

1

您不应该从代码的关键部分调用外部代码,这很容易导致死锁。

如果您在调试器中暂停执行,您可能会看到有一个或多个线程在等待获取 _mutex。

unique_lock如果从 func() 调用的任何代码尝试 enqueue(),您将无法再次获取on_mutex。

condition_variable等待结束后尝试释放锁。作为测试,您可以添加一个额外的范围,看看这是否有帮助:

while (continue_.test_and_set())
{
    std::vector<std::function<void()>> queue;
    {
        auto lock = std::unique_lock<std::mutex>{ mutex_ };
        //condition_.wait_for(lock, std::chrono::milliseconds(1));
        condition_.wait(lock);
        queue.swap(queue_);
    }
    for (auto &func : queue)
    {
        func();
    }
}
于 2013-11-14T06:35:33.107 回答
1

在关闭时,您的代码中有潜在的活锁。以下交错是可能的:

main() 线程 run() 中的线程
                   检查 continue_,看看它是真的
设置 continue_ = false
通知条件变量
加入
                   等待条件变量

为避免这种情况,您需要条件检查和 cv wait 以原子方式发生。这最容易通过continue_使用mutex_( Live at Coliru ) 保护来实现:

class main_thread
{
public:
    static auto instance() -> main_thread&
    {
        static main_thread instance_;
        return instance_;
    }
    auto enque(std::function<void()> func) -> void
    {
        {
            std::lock_guard<std::mutex> lock{ mutex_ };
            queue_.push_back(func);
        }
        condition_.notify_one();
    }
private:
    main_thread() : continue_{true}
    {
        thread_ = std::thread{ &main_thread::run, this };
    }
    ~main_thread()
    {
        {
            std::lock_guard<std::mutex> lock{ mutex_ };
            continue_ = false;
        }
        condition_.notify_all();
        if (thread_.joinable())
        {
            thread_.join();
        }
    }

    auto run() -> void
    {
        std::unique_lock<std::mutex> lock{ mutex_ };
        while(continue_)
        {
            if(queue_.empty())
            {
                condition_.wait(lock);
                continue;
            }

            std::vector<std::function<void()>> queue;
            queue.swap(queue_);
            lock.unlock();
            for (auto &func : queue)
            {
                func();
            }
            lock.lock();
        }
    }

    std::condition_variable condition_;
    std::mutex mutex_;
    std::vector<std::function<void()>> queue_;
    bool continue_;
    std::thread thread_;
};
于 2013-11-14T16:50:43.107 回答