0

我收到一个包含 N 个条目的大文件。对于每个条目,我正在创建一个新线程。我需要等待所有 N 个线程被终止。

一开始我使用的是 Phaser,但它的实现仅限于 65K 方。所以,是因为 N 可能像 100K 一样爆炸。

然后,我尝试了 CountDownLatch。这很好用,非常简单的概念和非常简单的实现。但是我不知道N的数量。

Phaser 是我的解决方案,但有这个限制。

有任何想法吗?

这篇文章是相关的: 灵活的 CountDownLatch?

4

3 回答 3

1

听起来您要解决的问题是尽快处理大量任务并等待处理完成。

同时处理大量任务的问题在于,它可能会导致过多的上下文切换,并且基本上会削弱您的机器并减慢处理速度,使其超过一定数量的(取决于硬件的)并发线程。这意味着您需要对正在执行的并发工作线程设置上限。

Phaser 和 CountDownLatch 都是同步原语,它们的目的是提供对关键代码块的访问控制,而不是管理并行执行。

在这种情况下,我会使用Executor 服务。它支持添加任务(以多种形式,包括Runnable)。

ExecutorService您可以使用Executors类轻松创建一个。我建议为此使用固定大小的线程池,最大线程数为 20-100 - 取决于您的任务的 CPU 密集程度。任务所需的计算能力越多,可以处理的并行线程数量就越少,而不会严重降低性能。

有多种方法可以等待所有任务完成:

  • 收集Future该方法返回的所有实例,submit然后简单地对所有实例调用get。这可确保在循环完成时执行每个任务。
  • 关闭执行器服务,等待所有提交的任务完成。此方法的缺点是您必须指定等待任务完成的最长时间。此外,它不太优雅,您并不总是想要关闭Executor,这取决于您是在编写单次应用程序还是在之后继续运行的服务器 - 如果是服务器应用程序,您肯定必须采用以前的方法。

最后,这是一个说明所有这些的代码片段:

List<TaskFromFile> tasks = loadFileAndCreateTasks();
ExecutorService executor = Executors.newFixedThreadPool(50);

for(TaskFromFile task : tasks) {
    // createRunnable is not necessary in case your task implements Runnable
    executor.submit(createRunnable(task));
}

// assuming single-shot batch job
executor.shutdown();
executor.awaitTermination(MAX_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
于 2015-10-28T20:38:02.897 回答
1

ReusableCountLatch也是CountDownLatch允许增量的替代方法。

用法如下:

ReusableCountLatch latch = new ReusableCountLatch(); // creates latch with initial count 0
ReusableCountLatch latch = new ReusableCountLatch(10); // creates latch with initial count 10

latch.increment(); // increments counter

latch.decrement(); // decrement counter

latch.waitTillZero(); // blocks until counts falls to zero

boolean succeeded = latch.waitTillZero(200, MILLISECONDS); // waits for up to 200 milliseconds until count falls to zero

int count = latch.getCount(); // gets actual count

要使用它,只需将此 gradle/maven 依赖项添加到您的项目中:'com.github.matejtymes:javafixes:1.3.1'

更多细节可以在这里找到:https ://github.com/MatejTymes/JavaFixes

于 2016-07-14T19:27:13.213 回答
0

使用 AtomicInteger,您可以轻松实现相同的目标。用 1 初始化并随着每个新线程递增。一旦在工人和生产者中完成,减量并获得。如果零运行您的整理 Runnable。

于 2015-10-28T20:26:59.257 回答