8

我正在尝试使用 boost::thread 在 C++ 上的线程上实现 Actor 计算模型。但是程序在执行过程中抛出了奇怪的异常。异常并不稳定,有时程序以正确的方式工作。

那里有我的代码:

演员.hpp

class Actor {

  public:
    typedef boost::function<int()> Job;

  private:
    std::queue<Job>             d_jobQueue;
    boost::mutex                d_jobQueueMutex;
    boost::condition_variable   d_hasJob;
    boost::atomic<bool>         d_keepWorkerRunning;
    boost::thread               d_worker;

    void workerThread();

  public:
    Actor();
    virtual ~Actor();

    void execJobAsync(const Job& job);

    int execJobSync(const Job& job);
};

演员.cpp

namespace {

int executeJobSync(std::string          *error,
                   boost::promise<int> *promise,
                   const Actor::Job     *job)
{
    int rc = (*job)();

    promise->set_value(rc);
    return 0;
}

}

void Actor::workerThread()
{
    while (d_keepWorkerRunning) try {
        Job job;
        {
            boost::unique_lock<boost::mutex> g(d_jobQueueMutex);

            while (d_jobQueue.empty()) {
                d_hasJob.wait(g);
            }

            job = d_jobQueue.front();
            d_jobQueue.pop();
        }

        job();
    }
    catch (...) {
        // Log error
    }
}

void Actor::execJobAsync(const Job& job)
{
    boost::mutex::scoped_lock g(d_jobQueueMutex);
    d_jobQueue.push(job);
    d_hasJob.notify_one();
}

int Actor::execJobSync(const Job& job)
{
    std::string error;
    boost::promise<int> promise;
    boost::unique_future<int> future = promise.get_future();

    {
        boost::mutex::scoped_lock g(d_jobQueueMutex);
        d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job));
        d_hasJob.notify_one();
    }

    int rc = future.get();

    if (rc) {
        ErrorUtil::setLastError(rc, error.c_str());
    }

    return rc;
}

Actor::Actor()
: d_keepWorkerRunning(true)
, d_worker(&Actor::workerThread, this)
{
}

Actor::~Actor()
{
    d_keepWorkerRunning = false;
    {
        boost::mutex::scoped_lock g(d_jobQueueMutex);
        d_hasJob.notify_one();
    }
    d_worker.join();
}

实际上抛出的异常是 boost::thread_interrupted int rc = future.get();。但是形成 boost 文档我不能解释这个例外。文档说

抛出: - boost::thread_interrupted 如果与 *this 关联的结果在调用时尚未准备好,并且当前线程被中断。

但是我的工作线程不能处于中断状态。

当我使用 gdb 并设置“catch throw”时,我看到回溯看起来像

抛出 thread_interrupted

boost::detail::interruption_checker::check_for_interruption

boost::detail::interruption_checker::interruption_checker

boost::condition_variable::wait

boost::detail::future_object_base::wait_internal

boost::detail::future_object_base::wait

boost::detail::future_object::get

boost::unique_future::get

我查看了提升源,但不明白为什么 interrupt_checker 决定工作线程被中断。

所以有人C ++大师,请帮助我。我需要做什么才能获得正确的代码?我在用着:

提升 1_53

Linux 版本 2.6.18-194.32.1.el5 红帽 4.1.2-48

海合会 4.7

编辑

解决它!感谢 Evgeny Panasyuk 和 Lazin。问题在于 TLS 管理。boost::thread 和 boost::thread_specific_ptr 使用相同的 TLS 存储来实现它们的目的。就我而言,当他们都试图在创建时更改此存储时出现问题(不幸的是,我不明白为什么会发生这种情况)。所以 TLS 被破坏了。

我用 __thread 指定的变量替换了代码中的 boost::thread_specific_ptr。

Offtop:在调试过程中,我发现外部库中的内存损坏并修复了它 =)

.

编辑 2 我得到了确切的问题...这是 GCC 中的一个错误 =) _GLIBCXX_DEBUG 编译标志破坏了 ABI。你可以看到关于 boost bugtracker 的讨论: https ://svn.boost.org/trac/boost/ticket/7666

4

2 回答 2

5

我发现了几个错误:


Actor::workerThread功能确实双重解锁d_jobQueueMutex。第一个解锁是手动d_jobQueueMutex.unlock();的,第二个是在析构函数中boost::unique_lock<boost::mutex>

您应该防止其中一种解锁,例如释放unique_lock和之间的关联mutex

g.release(); // <------------ PATCH
d_jobQueueMutex.unlock();

或者添加额外的代码块 + default-constructed Job


有可能workerThread永远不会离开以下循环:

while (d_jobQueue.empty()) {
    d_hasJob.wait(g);
}

想象以下情况:d_jobQueue为空,Actor::~Actor()被调用,设置标志并通知工作线程:

d_keepWorkerRunning = false;
d_hasJob.notify_one();

workerThread在 while 循环中醒来,看到队列为空并再次休眠。

通常的做法是发送特殊的最终作业来停止工作线程:

~Actor()
{
    execJobSync([this]()->int
    {
        d_keepWorkerRunning = false;
        return 0;
    });
    d_worker.join();
}

在这种情况下,d_keepWorkerRunning不需要是原子的。


Coliru 上的现场演示


编辑

我已将事件队列代码添加到您的示例中。

EventQueueImpl您在和中都有并发队列Actor,但对于不同的类型。可以将公共部分提取到concurrent_queue<T>适用于任何类型的单独实体中。在一个地方调试和测试队列比捕获分散在不同类中的错误要容易得多。

所以,你可以尝试使用这个concurrent_queue<T>(在 Coliru 上)

于 2013-10-28T07:31:14.750 回答
2

这只是一个猜测。我认为有些代码实际上可以调用boost::tread::interrupt()。您可以为此函数设置断点并查看对此负责的代码。您可以测试中断execJobSync

int Actor::execJobSync(const Job& job)
{
    if (boost::this_thread::interruption_requested())
        std::cout << "Interruption requested!" << std::endl;
    std::string error;
    boost::promise<int> promise;
    boost::unique_future<int> future = promise.get_future();

在这种情况下,最可疑的代码是引用了线程对象的代码。

无论如何,让你的 boost::thread 代码中断是个好习惯。也可以在某些范围内禁用中断

如果不是这种情况 - 您需要检查与线程本地存储一起使用的代码,因为线程中断标志存储在 TLS 中。也许你的一些代码会重写它。您可以在此类代码片段之前和之后检查中断。

另一种可能是您的内存已损坏。如果没有代码调用 boost::thread::interrupt() 并且您不使用 TLS。这是最难的情况,尝试使用一些动态分析器 - valgrind 或 clang memory sanitizer。

题外话: 您可能需要使用一些并发队列。std::queue 会因为高内存争用而变得非常慢,并且最终会导致缓存性能不佳。良好的并发队列允许您的代码并行地使元素入队和出队。

此外,actor 不是应该执行任意代码的东西。Actor 队列必须接收简单的消息,而不是函数!您正在编写作业队列 :) 您需要查看一些参与者系统,例如Akkalibcpa

于 2013-10-29T11:58:55.323 回答