1

我正在用 Java 构建一个缓冲区,它将收集许多线程请求的写入操作,并将它们作为一个组刷新,例如每秒一次。我想给它一个调用的方法,该方法waitForFlush将阻塞和线程调用它,直到下一个刷新事件完成。同时,一个单独的、独立的线程正在循环中刷新和休眠。所以我基本上是在寻找一种并发结构或模式,它允许许多线程在特定点阻塞,然后同时释放它们,我发现 Java 的内置并发原语都不是非常接近的匹配. 到目前为止,我想出的最好的方法是 wait/notifyAll,如下所示:

public class Buffer {

  private volatile long lastFlushTime = System.currentTimeMillis();
  private final Object flushMonitor = new Object();

  public void waitForFlush() {
    long entryTime = System.currentTimeMillis();

    synchronized(flushMonitor) {
      while(lastFlushTime <= entryTime) {
        flushMonitor.wait();
      }
    }
  }

  public void flush() {
    // do flush stuff here
    synchronized(flushMonitor) {
      lastFlushTime = System.currentTimeMillis();
      flushMonitor.notifyAll();
    }
  }
}

尽管我认为这在实践中可以正常工作,但waitForNotify()对我来说,同步块仍然有些不完美。理想情况下,对于这个用例,您可以在wait()不同步关联对象的情况下进行调用,并且所有被阻塞的线程将在被调用的同一时刻被释放notifyAll(),而不必逐个退出同步块一。

所以,一般来说,有没有比我上面概述的更好的方法来阻塞和同时释放可变数量的线程(我认为 Semaphore 和 CountDownLatch 类只适用于固定数量的线程)?

4

2 回答 2

3

正如 Marko Topolnik 和 munyengm 所提到的,尽管 CountDownLatch 只适用于一次性案例。它在循环场景中失败(即每个 CDL只能await&一次)。countDown然后您可能会考虑使用 CyclicBarrier,但这在您的情况下会失败,因为您需要知道正在使用的线程数。

如果您可以使用 Java 7,我推荐Phaser。您可以向许多等待线程发出单线程信号并重用。

final Phaser phaser = new Phaser(1);//register one thread to arrive

 public void waitForFlush() {
    int phase = phaser.getPhase();
    phaser.awaitAdvance(phase);    
 }

  public void flush() {
      lastFlushTime = System.currentTimeMillis();
      phaser.arrive(); //signals all waiting threads on the current phase and will increment the phase by 1
  }
于 2012-07-07T14:33:19.057 回答
2

如果你使用CountDownLatch得当,我认为它可以为你做到。方法是在闩锁上设置可变数量的线程await,然后刷新线程调用countDown. 锁存器始终初始化为 1。这是它的工作方式:

public class FlushControl
{
  private volatile CountDownLatch latch = new CountDownLatch(1);

  public void awaitFlush() throws InterruptedException { latch.await(); }

  public void flush() {
    final CountDownLatch l = latch;
    latch = new CountDownLatch(1);
    l.countDown();
  }
}
于 2012-07-07T14:25:41.040 回答