我正在编写一个音频 DSP 应用程序,并且我选择使用生产者-消费者模型。我一直在阅读很多关于volatile
和其他线程问题的内容,但是我对我的案例的一些细节有几个问题 - 特别是,我需要在线程之间共享的东西之一是数组数组。
我有一个代表生产者的类。为了允许处理时间的变化,生产者存储n
缓冲区,每次有更多音频数据可用时,它都会轮流填充缓冲区,并将缓冲区传递给消费者线程。
我将从我的问题开始,然后我将尝试足够详细地解释我的系统 - 很抱歉这篇长文,感谢您对我的包容!我也非常感谢关于我的实现及其线程安全的一般性评论。
我的缓冲区由volatile byte[][]
数组表示。我很清楚,volatile
唯一会使参考变得易变,但是在阅读了 SO 和各种博客文章之后,似乎我有两个选择:
我可以使用AtomicIntegerArray
. 但:
我会为这样的应用程序牺牲性能吗?
原子性甚至是我需要的吗?我打算一次性写入整个数组,然后我需要它对另一个线程可见,我不需要每个单独的写入都是原子的或立即可见的。
如果我理解正确(例如这篇博文),一个自我分配,在我的例子中是:buffers[currentBuffer] = buffers[currentBuffer]
将确保发布,您将在下面的代码中看到。
这是正确的,它会导致所有最近的写入变得可见吗?
这在像这样的二维数组的情况下有效吗?
我将尝试简要概述生产者类;这些是实例变量:
// The consumer - just an interface with a process(byte[]) method
AudioInputConsumer consumer;
// The audio data source
AudioSource source;
// The number of buffers
int bufferCount;
// Controls the main producer loop
volatile boolean isRunning = false;
// The actual buffers
volatile byte[][] buffers;
// The number of buffers left to process.
// Shared counter - the producer inrements and checks it has not run
// out of buffers, while the consumer decremenets when it processes a buffer
AtomicInteger buffersToProcess = new AtomicInteger(0);
// The producer thread.
Thread producerThread;
// The consumer thread.
Thread consumerThread;
一旦我启动producerThread
and consumerThread
,它们就分别执行方法producerLoop
and consumerLoop
。
producerLoop
在等待音频数据时阻塞,读入缓冲区,对缓冲区执行自分配,然后使用AtomicInteger
实例向消费者循环发出信号。
private void producerLoop() {
int bufferSize = source.getBufferSize();
int currentBuffer = 0;
while (isRunning) {
if (buffersToProcess.get() == bufferCount) {
//This thread must be faster than the processing thread, we have run out
// of buffers: decide what to do
System.err.println("WARNING: run out of buffers");
}
source.read(buffers[currentBuffer], 0, bufferSize); // Read data into the buffer
buffers[currentBuffer] = buffers[currentBuffer]; // Self-assignment to force publication (?)
buffersToProcess.incrementAndGet(); // Signal to the other thread that there is data to read
currentBuffer = (currentBuffer + 1) % bufferCount; // Next buffer
}
}
consumerLoop
等待直到AtomicInteger
buffersToProcess
大于零,然后调用消费者对象对数据做任何它想做的事情。之后buffersToProcess
递减,我们等待它再次变为非零。
private void consumerLoop() {
int currentBuffer = 0;
while (isRunning) {
if (buffersToProcess.get() > 0) {
consumer.process(buffers[currentBuffer]); // Process the data
buffersToProcess.decrementAndGet(); // Signal that we are done with this buffer
currentBuffer = (currentBuffer + 1) % bufferCount; // Next buffer
}
Thread.yield();
}
}
非常感谢!