0

我正在编写一个音频 DSP 应用程序,并且我选择使用生产者-消费者模型。我一直在阅读很多关于volatile和其他线程问题的内容,但是我对我的案例的一些细节有几个问题 - 特别是,我需要在线程之间共享的东西之一是数组数组。

我有一个代表生产者的类。为了允许处理时间的变化,生产者存储n缓冲区,每次有更多音频数据可用时,它都会轮流填充缓冲区,并将缓冲区传递给消费者线程。

我将从我的问题开始,然后我将尝试足够详细地解释我的系统 - 很抱歉这篇长文,感谢您对我的包容!我也非常感谢关于我的实现及其线程安全的一般性评论。

我的缓冲区由volatile byte[][]数组表示。我很清楚,volatile唯一会使参考变得易变,但是在阅读了 SO 和各种博客文章之后,似乎我有两个选择:

我可以使用AtomicIntegerArray. 但:

  • 我会为这样的应用程序牺牲性能吗?

  • 原子性甚至是我需要的吗?我打算一次性写入整个数组,然后我需要它对另一个线程可见,我不需要每个单独的写入都是原子的或立即可见的。

如果我理解正确(例如这篇博文),一个自我分配,在我的例子中是:buffers[currentBuffer] = buffers[currentBuffer]将确保发布,您将在下面的代码中看到。

  • 这是正确的,它会导致所有最近的写入变得可见吗?

  • 这在像这样的二维数组的情况下有效吗?


我将尝试简要概述生产者类;这些是实例变量:

// The consumer - just an interface with a process(byte[]) method
AudioInputConsumer consumer;

// The audio data source
AudioSource source;

// The number of buffers
int bufferCount;

// Controls the main producer loop
volatile boolean isRunning = false;

// The actual buffers
volatile byte[][] buffers;

// The number of buffers left to process.
// Shared counter - the producer inrements and checks it has not run
// out of buffers, while the consumer decremenets when it processes a buffer
AtomicInteger buffersToProcess = new AtomicInteger(0);

// The producer thread.
Thread producerThread;

// The consumer thread.
Thread consumerThread;

一旦我启动producerThreadand consumerThread,它们就分别执行方法producerLoopand consumerLoop

producerLoop在等待音频数据时阻塞,读入缓冲区,对缓冲区执行自分配,然后使用AtomicInteger实例向消费者循环发出信号。

private void producerLoop() {
  int bufferSize = source.getBufferSize();
  int currentBuffer = 0;

  while (isRunning) {
    if (buffersToProcess.get() == bufferCount) {
      //This thread must be faster than the processing thread, we have run out
      // of buffers: decide what to do
      System.err.println("WARNING: run out of buffers");
    }

    source.read(buffers[currentBuffer], 0, bufferSize); // Read data into the buffer
    buffers[currentBuffer] = buffers[currentBuffer];    // Self-assignment to force publication (?)
    buffersToProcess.incrementAndGet();                 // Signal to the other thread that there is data to read
    currentBuffer = (currentBuffer + 1) % bufferCount;  // Next buffer
  }
}

consumerLoop等待直到AtomicInteger buffersToProcess大于零,然后调用消费者对象对数据做任何它想做的事情。之后buffersToProcess递减,我们等待它再次变为非零。

private void consumerLoop() {
  int currentBuffer = 0;

  while (isRunning) {
    if (buffersToProcess.get() > 0) {
      consumer.process(buffers[currentBuffer]);          // Process the data
      buffersToProcess.decrementAndGet();                // Signal that we are done with this buffer
      currentBuffer = (currentBuffer + 1) % bufferCount; // Next buffer
    }
    Thread.yield();
  }
}

非常感谢!

4

3 回答 3

2

您确实需要原子性,因为写入数组是一个非原子过程。具体来说,Java 肯定永远不会保证写入数组成员对其他线程不可见,直到您选择发布它们。

一种选择是每次都创建一个新数组,对其进行完全初始化,然后通过 a 发布volatile,但这可能会产生巨大的成本,因为 Java 坚持新分配的数组必须首先清零,并且由于 GC 开销。您可以通过“双缓冲”方案来克服这个问题,您只保留两个数组并在它们之间切换。这种方法有其危险:一个线程可能仍在从您的写入线程已标记为非活动的数组中读取。这在很大程度上取决于代码的精确细节。

synchronized唯一的其他选择是在一个经典的、无聊的块中完成整个阅读和写作。这具有延迟非常可预测的优点。就个人而言,如果绝对受到实际性能问题的压力,我会从这里开始,然后继续进行任何更复杂的事情。

您也可以使用读写锁进行锁定,但这只有在多个线程同时读取数组时才会奏效。这似乎不是你的情况。

于 2013-08-22T09:09:46.213 回答
1

看看 java.util.concurrent.ArrayBlockingQueue。

当你在阻塞队列上调用 take() 时,它会自动等待直到有东西可用。

只需将 byte[] 放到队列中,消费者就会处理它。在这种情况下,需要知道要处理多少缓冲区是无关紧要的。通常,队列中的“终止”项将代表最后一个缓冲区。将 byte[] 包装在带有布尔终止标志的类中会有所帮助。

问候优素福

于 2013-08-22T09:50:17.913 回答
0

除了@Marko Topolnik 上面解释的 volatile 和 atomic 是如何工作的,如果您仍然想在写入数组时实现跨线程可见性的效果,您可以使用 Unsafe.putLongVolatile()、Unsafe.putObjectVolatile() 和所有其他方法在同一个家庭。

是的,它不像 Java,但它可以解决您的问题。

于 2013-08-23T02:33:04.930 回答