8

默认情况下,Java 流由使用默认参数构造的公共线程池处理。正如在另一个问题中已回答的那样,可以通过指定自定义池或设置java.util.concurrent.ForkJoinPool.common.parallelism系统参数来调整这些默认值。

但是,我无法通过这两种方法中的任何一种来增加分配给流处理的线程数。例如,考虑下面的程序,它处理包含在其第一个参数中指定的文件中的 IP 地址列表并输出解析的地址。在具有大约 13000 个唯一 IP 地址的文件上运行此程序,我发现使用Oracle Java Mission Control的线程少至 16 个。其中,只有五个是ForkJoinPool工人。然而,这个特定的任务将受益于更多的线程,因为线程大部分时间都在等待 DNS 响应。所以我的问题是,我怎样才能真正增加使用的线程数?

我已经在三个环境中尝试过该程序;这些是操作系统报告的线程数。

  • Java SE Runtime Environment 在运行 Windows 7 的 8 核机器上构建 1.8.0_73-b02:17 个线程
  • Java SE 运行时环境在运行 OS X Darwin 15.2.0 的 2 核机器上构建 1.8.0_66-b17:23 个线程
  • 运行 FreeBSD 11.0 的 24 核机器上的 openjdk 版本 1.8.0_72:44 个线程

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;

/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
    /** Resolve the passed IP address into a name */
    static String addressName(String ipAddress) {
        try {
            return InetAddress.getByName(ipAddress).getHostName();
        } catch (UnknownHostException e) {
            return ipAddress;
        }
    }

    public static void main(String[] args) {
        Path path = Paths.get(args[0]);
        ForkJoinPool fjp = new ForkJoinPool(100);
        try {
            fjp.submit(() -> {
                try {
                    Files.lines(path)
                    .parallel()
                    .map(line -> addressName(line))
                    .forEach(System.out::println);
                } catch (IOException e) {
                    System.err.println("Failed: " + e);
                }
            }).get();
        } catch (Exception e) {
            System.err.println("Failed: " + e);
        }
    }
}
4

1 回答 1

8

你的方法有两个问题。首先是使用自定义 FJP 不会更改由流 API 创建的单个任务的最大数量,因为这是通过以下方式定义的

static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;

因此,即使您使用自定义池,并行任务的数量也会受到commonPoolParallelism * 4. (实际上不是硬限制,而是一个目标,但在很多情况下,任务数等于这个数)。

上面的问题可以通过使用java.util.concurrent.ForkJoinPool.common.parallelism系统属性来解决,但是在这里你遇到了另一个问题:Files.lines并行化非常糟糕。有关详细信息,请参阅此问题。特别是,对于 13000 条输入线,最大可能的加速是 3.17 倍(假设每条线的处理时间大致相同),即使您有 100 个 CPU。我的StreamEx库为此提供了解决方法(使用 创建流StreamEx.ofLines(path).parallel())。另一种可能的解决方案是将文件行顺序读取到List中,然后从中创建并行流:

Files.readAllLines(path).parallelStream()...

这将与系统属性一起使用。然而,一般来说,当任务涉及 I/O 时,Stream API 并不适合并行处理。更灵活的解决方案是CompletableFuture为每一行使用:

ForkJoinPool fjp = new ForkJoinPool(100);
List<CompletableFuture<String>> list = Files.lines(path)
    .map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp))
    .collect(Collectors.toList());
list.stream().map(CompletableFuture::join)
    .forEach(System.out::println);

这样您就不需要调整系统属性,并且可以将单独的池用于单独的任务。

于 2016-02-23T16:17:40.493 回答