28

我现在遇到了两次问题,生产者线程产生 N 个工作项,将它们提交给一个ExecutorService,然后需要等到所有 N 个项目都被处理完。

注意事项

  • N 事先不知道。如果是,我会简单地创建一个CountDownLatch然后拥有生产者线程await(),直到所有工作完成。
  • 使用 aCompletionService是不合适的,因为尽管我的生产者线程需要阻塞(即通过调用take()),但无法发出所有工作已完成的信号,从而导致生产者线程停止等待。

我目前最喜欢的解决方案是使用整数计数器,并在提交工作项时增加它,并在处理工作项时减少它。在提交所有 N 个任务之后,我的生产者线程将需要等待一个锁,检查是否counter == 0收到通知。消费者线程将需要通知生产者,如果它已经减少了计数器并且新值是 0。

有没有更好的方法来解决这个问题,或者java.util.concurrent我应该使用合适的构造而不是“滚动我自己的”?

提前致谢。

4

6 回答 6

31

java.util.concurrent.Phaser看起来它对你很有效。它计划在 Java 7 中发布,但最稳定的版本可以在jsr166的兴趣组网站上找到。

移相器是一个美化的循环屏障。您可以注册 N 方,并在您准备好时等待他们在特定阶段的推进。

一个关于它如何工作的简单示例:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}
于 2009-10-28T12:42:28.880 回答
2

您当然可以使用 a CountDownLatchprotected by anAtomicReference以便您的任务被包装起来:

public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

请注意,任务运行它们的工作,然后旋转直到设置倒计时(即知道任务的总数)。一旦设置了倒计时,他们就会倒计时并退出。这些提交如下:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

您的作品创建/提交之后,当您知道您创建了多少任务时

latch.set(new CountDownLatch(nTasks));
latch.get().await();
于 2009-10-28T10:08:07.240 回答
1

我已经将 ExecutorCompletionService 用于这样的事情:

ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
    executor.submit(new Processor());
    count++;
}

//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
    executor.take().get();
}

这涉及保留已提交的任务队列,因此如果您的列表任意长,您的方法可能会更好。

但请确保使用 anAtomicInteger进行协调,以便您能够在一个线程中增加它,并在工作线程中减少它。

于 2009-10-28T10:10:39.707 回答
0

我假设您的生产者不需要知道队列何时为空,但需要知道最后一个任务何时完成。

我会waitforWorkDone(producer)向消费者添加一个方法。生产者可以添加它的 N 个任务并调用等待方法。当工作队列不为空且当前没有任务正在执行时,wait 方法会阻塞传入线程。

notifyAll()如果任务已完成,队列为空且没有其他任务正在执行,则等待锁上的消费者线程。

于 2009-10-28T10:08:56.400 回答
0

您所描述的与使用标准信号量非常相似,但使用的是“向后”。

  • 您的信号量以 0 个许可开始
  • 每个工作单位完成后发放一张许可证
  • 您通过等待获得 N 个许可来阻止

另外,您可以灵活地仅获得 M < N 许可,如果您想检查中间状态,这很有用。例如,我正在测试一个异步有界消息队列,因此我希望队列在某些 M < N 时已满,因此我可以获取 M 并检查队列是否确实已满,然后在使用来自队列。

于 2017-05-18T15:38:24.210 回答
0

Stream独立的 Java 8+ 方法,使用单次传递正是这样做的PhaserIterator/Iterable变体完全相同。

public static <X> int submitAndWait(Stream<X> items, Consumer<X> handleItem, Executor executor) {
    Phaser phaser = new Phaser(1); // 1 = register ourselves
    items.forEach(item -> {
            phaser.register(); // new task
            executor.execute(() -> {
                handleItem.accept(item);
                phaser.arrive(); // completed task
            });
    });
    phaser.arriveAndAwaitAdvance(); // block until all tasks are complete
    return phaser.getRegisteredParties()-1; // number of items
}
...
int recognised = submitAndWait(facesInPicture, this::detectFace, executor)

供参考。这对于单个事件来说很好,但是如果同时调用它,则涉及的解决方案ForkJoinPool将阻止父线程阻塞。

于 2018-08-08T11:58:33.987 回答