-2

我有一个程序可以从使用中受益匪浅(大型数据集,其中涉及一些映射和过滤方案,但不依赖于外部变量/同步)parallelStream(),但必须作为一个集合进行收集。

我对并行流有些陌生(这是我第一次使用它们),并尝试使用下面的代码却发现这导致了非并发后端的并发修改和幕后的死锁条件。

此映射尝试使用 Linux 本机命令获取未挂载磁盘的文件大小sudo blockdev --getsize64 unmounted_device_here(我不知道 Java 是否可以在 Linux 上获取未挂载磁盘的完整大小,所以我只是使用本机方法,因为这只会是无论如何都在Linux系统上发布)

映射方法(死锁):

var mountPath = Paths.get("/dev");
            //Do NVME Drives First
            var list = new ArrayList<Path>(10);
            //Looks like nvme1n1
            //For reasons beyond my understanding replacing [0-9] with \\d does not work here
            try (var directoryStream = Files.newDirectoryStream(mountPath, "nvme[0-9]n[0-9]")) {
                for (Path path : directoryStream) {
                    list.add(path);
                }
            }
//Map to DrivePacket (path, long), note that blockdev return bytes -> GB
var nvmePackets = list.parallelStream().map((drive) -> new DrivePacket(drive,
                    (Long.parseLong(runCommand("sudo", "blockdev", "--getsize64", drive.toAbsolutePath().toString())) / (1024 * 1024 * 1024))))
                    .collect(Collectors.toSet());

IOUtils 来自 Apache 实用程序类:

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>

runCommand(执行本机调用):

public static String runCommand(String... command) {
        try {
            if (DEBUG_MODE) {
                systemMessage("Running Command: " + Arrays.asList(command).stream().collect(Collectors.joining(" ")));
            }
            var builder = new ProcessBuilder(command);
            var result = IOUtils.toString(builder.start().getInputStream(), StandardCharsets.UTF_8).replaceAll("\n", "");
            if (DEBUG_MODE) {
                System.out.println("Result: " + result);
            }
            return result;
        } catch (IOException ex) {
            throw new IllegalStateException(ex);

        }
    }

DrivePacket 类:

   /**
     * A record of the relevant information for a drive
     *
     * Path is the fully qualified /dev/DRIVE path
     */
    public record DrivePacket(Path drivePath, long driveSize) {}

由于操作受益于并发性,有没有办法使用parallelStream?还是我必须使用其他技术?

它总是挂在执行这行代码的停止处,并在我使用调试器时永远ForkJoinTask.java等待externalAwaitDone();

不幸的是,我找不到Set类似的东西。toConcurrentMap()

我怎样才能避免这种死锁,同时仍然获得计算的并行性并让最终结果成为一个集合?

系统:JDK 16

编辑 0:更新了可再现性代码

鉴于映射代码调用不共享数据的子例程,我不确定为什么这会导致死锁情况。

4

1 回答 1

1

也许你可以把它收集到 aConcurrentMap然后得到keySet. (假设为映射对象定义了方法)equalshashcode

list.parallelStream()
    .map(x -> complicated_mapping_here)
    .collect(Collectors.toConcurrentMap(Function.identity(),
                                        x -> Boolean.TRUE /*dummy value*/ ))
    .keySet();
于 2021-06-03T07:59:37.170 回答