1

编辑:下面

我有一个线程负责从缓冲区中的设备流式传输数据。此外,我有 N 个线程对该数据进行一些处理。在我的设置中,我希望流媒体线程从设备中获取数据,并等待 N 个线程完成处理,然后再获取新数据或达到超时。N 个线程应该等到新数据被提取后再继续处理。我相信如果我不希望 N 个线程在缓冲区上重复处理并且如果我希望处理所有缓冲区而不跳过任何缓冲区,那么这个框架应该可以工作。

仔细阅读后,我发现条件变量正是我所需要的。我遵循了教程和其他堆栈溢出问题,这就是我所拥有的:

全局变量:

boost::condition_variable cond;
boost::mutex mut;

成员变量:

std::vector<double> buffer
std::vector<bool> data_ready       // Size equal to number of threads

数据接收器循环(1个线程运行):

while (!gotExitSignal())
{
    {
        boost::unique_lock<boost::mutex> ll(mut);
        while(any(data_ready))
            cond.wait(ll);
    }

    receive_data(buffer);

    {
        boost::lock_guard<boost::mutex> ll(mut);
        set_true(data_ready);
    }

    cond.notify_all();
}

数据处理循环(N个线程运行这个)

while (!gotExitSignal())
{
    {
        boost::unique_lock<boost::mutex> ll(mut);
        while(!data_ready[thread_id])
            cond.wait(ll);
    }

    process_data(buffer);

    {
        boost::lock_guard<boost::mutex> ll(mut);
        data_ready[thread_id] = false;
    }
    cond.notify_all();
}

这两个循环在它们自己的同一个类的成员函数中。变量buffer是成员变量,所以可以跨线程共享。

接收者线程将首先启动。data_ready 变量是一个大小为 N 的 bool 向量。如果数据已准备好处理,则 data_ready[i] 为真,如果线程已处理数据,则为假。如果 data_ready 的任何元素为真,则函数 any(data_ready) 输出真,否则输出假。set_true(data_ready) 函数将 data_ready 的所有元素设置为 true。接收线程将检查是否有任何处理线程仍在处理。如果没有,它将获取数据,设置 data_ready 标志,通知线程,并继续循环,该循环将在开始处停止,直到处理完成。处理线程将检查它们各自的 data_ready 标志是否为真。一旦为真,处理线程将进行一些计算,将其各自的 data_ready 标志设置为 0,然后继续循环。

如果我只有一个处理线程,程序运行良好。一旦我添加了更多线程,我就会遇到处理输出是垃圾的问题。此外,由于某种原因,处理线程的顺序很重要;换句话说,我启动的最后一个线程将输出正确的数据,而之前的线程将输出垃圾,无论处理的输入参数是什么(假设有效参数)。我不知道问题是由于我的线程代码还是我的设备或数据处理设置有问题。我尝试在处理和接收步骤中使用 couts,并且使用 N 个处理线程,我看到了应该的输出:

receive data
process 1
process 2
...
process N
receive data
process 1
process 2
...

条件变量的使用是否正确?可能是什么问题呢?

编辑:我遵循了 fork 的建议并将代码更改为:

数据接收器循环(1个线程运行):

while (!gotExitSignal())
{
    if(!any(data_ready))
    {
        receive_data(buffer);
        boost::lock_guard<boost::mutex> ll(mut);
        set_true(data_ready);
        cond.notify_all();
    }       
}

数据处理循环(N个线程运行这个)

while (!gotExitSignal())
{
    // boost::unique_lock<boost::mutex> ll(mut);
    boost::mutex::scoped_lock ll(mut);
    cond.wait(ll);

    process_data(buffer);

    data_ready[thread_id] = false;
}

它工作得更好一些。我使用了正确的锁吗?

4

1 回答 1

0

我没有阅读您的全部故事,但如果我快速查看代码,我会发现您使用错误的条件。条件就像一个状态,一旦你将一个线程设置为等待状态,它就会放弃 cpu。因此,您的线程将有效地停止运行,直到其他一些进程/线程通知它。

在您的代码中,您有一个 while 循环,每次检查数据时都会等待。那是错误的,它应该是 if 而不是一段时间。但话又说回来,它不应该在那里。数据检查应该在其他地方进行。并且您的工作线程在完成工作后应该将自己置于等待状态。

您的工作线程是消费者。生产者是提供数据的人。我认为更好的构造是进行线程检查是否有数据并通知工作人员。

伪代码:

//producer
while (true) {

    1. lock mutex
    2. is data available
    3. unlock mutex

    if (dataAvailableVariable) {
        4. notify a worker
        5. set waiting condition
    }
}


//consumer
while (true) {
    1. lock mutex
    2. do some work
    3. unlock mutex
    4. notify producer that work is done
    5. set wait condition
}

您还应该注意一些线程需要处于活动状态以避免死锁的事实,这意味着所有线程都处于等待状态。

我希望这对你有一点帮助。

于 2013-02-04T08:43:03.190 回答