3

我在如何以及何时完成可完成的期货方面有点挣扎。我创建了这个测试用例:

import org.junit.Test;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamOfCompletableFuturesTest {
    @Test
    public void testList() {
        completeFirstTwoElements(
                Stream.of("list one", "list two", "list three", "list four", "list five")
        );
    }

    @Test
    public void testIterator() {
        Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

        completeFirstTwoElements(
            StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
        );
    }

    private void completeFirstTwoElements(Stream<String> stream) {
        stream
                .map(this::cf)
                .limit(2)
                .parallel()
                .forEach(cf -> {
                    try {
                        System.out.println(cf.get());
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                });
    }

    private CompletableFuture<String> cf(String result) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Running " + result);
            return result;
        });
    }
}

输出是:

Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one

testList方法按预期工作。's 仅在最后CompletableFuture评估,因此在 limit 方法之后仅保留前两项。

然而,这个testIterator方法是出乎意料的。所有CompletableFuture的都完成了,限制只在之后完成。

如果我parallel()从流中删除该方法,它会按预期工作。但是,处理(forEach())应该并行完成,因为在我的完整程序中它是一个长时间运行的方法。

任何人都可以解释为什么会这样吗?

看起来这取决于 Java 版本,所以我使用的是 1.8:

$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
4

2 回答 2

4

limit()并行性适用于整个管道,因此您无法真正控制在并行应用之前将执行什么Stream。唯一的保证是,之后的limit()内容只会在保留的元素上执行。

两者之间的差异可能是由于一些实现细节或其他Stream特性。事实上,您可以通过使用SIZED特征来轻松地反转行为。似乎当Stream具有已知大小时,只处理 2 个元素。

因此,例如,应用一个简单的filter()会丢失列表版本的大小:

completeFirstTwoElements(
        Stream.of("list one", "list two", "list three", "list four", "list five").filter(a -> true)
);

输出例如:

Running list one
Running list five
Running list two
Running list three
list one
list two

并且不使用“修复”行为的未知大小版本:Spliterator.spliterator()

Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

completeFirstTwoElements(
        StreamSupport.stream(Spliterators.spliterator(iterator, Spliterator.ORDERED, 5), false)
);

输出:

Running iterator two
Running iterator one
iterator one
iterator two
于 2018-05-16T08:40:41.227 回答
4

您的语句“所有CompletableFutures 都已完成”相当于“所有CompletableFutures 已创建”,因为一旦supplyAsync执行,供应商的评估就已安排好,无论最终是否有人调用get

所以你在这里感知到的,是传递给 的函数的求值map,即使后续处理不会消耗结果。这是一个有效的行为;只要 Stream 之后将使用正确的结果,就限制和遇到顺序而言,该函数可能会以任意顺序甚至同时对更多元素进行评估。

现在,是否会评估超出必要的元素以及处理了多少多余的元素,这是一个实现细节,并且实现已经改变,如“<a href="https://stackoverflow.com/q/50064786/2711488 中所述">限制和无序流的内部更改”。虽然 Q&A 是关于无序流的,但有可能对有序流进行了类似的改进。

要点是,您不应假设仅针对最少数量的必需元素评估函数。这样做会降低并行处理的效率。这仍然适用,即使 Java 9 改进了并行limit操作。一个简单的改变可能会重新引入对更多元素的评估:

private void completeFirstTwoElements(Stream<String> stream) {
    stream.map(this::cf)
          .filter(x -> true)
          .limit(2)
          .parallel()
          .forEach(cf -> System.out.println(cf.join()));
}
于 2018-05-16T08:40:51.450 回答