编辑:下面
我有一个线程负责从缓冲区中的设备流式传输数据。此外,我有 N 个线程对该数据进行一些处理。在我的设置中,我希望流媒体线程从设备中获取数据,并等待 N 个线程完成处理,然后再获取新数据或达到超时。N 个线程应该等到新数据被提取后再继续处理。我相信如果我不希望 N 个线程在缓冲区上重复处理并且如果我希望处理所有缓冲区而不跳过任何缓冲区,那么这个框架应该可以工作。
仔细阅读后,我发现条件变量正是我所需要的。我遵循了教程和其他堆栈溢出问题,这就是我所拥有的:
全局变量:
boost::condition_variable cond;
boost::mutex mut;
成员变量:
std::vector<double> buffer
std::vector<bool> data_ready // Size equal to number of threads
数据接收器循环(1个线程运行):
while (!gotExitSignal())
{
{
boost::unique_lock<boost::mutex> ll(mut);
while(any(data_ready))
cond.wait(ll);
}
receive_data(buffer);
{
boost::lock_guard<boost::mutex> ll(mut);
set_true(data_ready);
}
cond.notify_all();
}
数据处理循环(N个线程运行这个)
while (!gotExitSignal())
{
{
boost::unique_lock<boost::mutex> ll(mut);
while(!data_ready[thread_id])
cond.wait(ll);
}
process_data(buffer);
{
boost::lock_guard<boost::mutex> ll(mut);
data_ready[thread_id] = false;
}
cond.notify_all();
}
这两个循环在它们自己的同一个类的成员函数中。变量buffer是成员变量,所以可以跨线程共享。
接收者线程将首先启动。data_ready 变量是一个大小为 N 的 bool 向量。如果数据已准备好处理,则 data_ready[i] 为真,如果线程已处理数据,则为假。如果 data_ready 的任何元素为真,则函数 any(data_ready) 输出真,否则输出假。set_true(data_ready) 函数将 data_ready 的所有元素设置为 true。接收线程将检查是否有任何处理线程仍在处理。如果没有,它将获取数据,设置 data_ready 标志,通知线程,并继续循环,该循环将在开始处停止,直到处理完成。处理线程将检查它们各自的 data_ready 标志是否为真。一旦为真,处理线程将进行一些计算,将其各自的 data_ready 标志设置为 0,然后继续循环。
如果我只有一个处理线程,程序运行良好。一旦我添加了更多线程,我就会遇到处理输出是垃圾的问题。此外,由于某种原因,处理线程的顺序很重要;换句话说,我启动的最后一个线程将输出正确的数据,而之前的线程将输出垃圾,无论处理的输入参数是什么(假设有效参数)。我不知道问题是由于我的线程代码还是我的设备或数据处理设置有问题。我尝试在处理和接收步骤中使用 couts,并且使用 N 个处理线程,我看到了应该的输出:
receive data
process 1
process 2
...
process N
receive data
process 1
process 2
...
条件变量的使用是否正确?可能是什么问题呢?
编辑:我遵循了 fork 的建议并将代码更改为:
数据接收器循环(1个线程运行):
while (!gotExitSignal())
{
if(!any(data_ready))
{
receive_data(buffer);
boost::lock_guard<boost::mutex> ll(mut);
set_true(data_ready);
cond.notify_all();
}
}
数据处理循环(N个线程运行这个)
while (!gotExitSignal())
{
// boost::unique_lock<boost::mutex> ll(mut);
boost::mutex::scoped_lock ll(mut);
cond.wait(ll);
process_data(buffer);
data_ready[thread_id] = false;
}
它工作得更好一些。我使用了正确的锁吗?