我在如何以及何时完成可完成的期货方面有点挣扎。我创建了这个测试用例:
import org.junit.Test;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamOfCompletableFuturesTest {
@Test
public void testList() {
completeFirstTwoElements(
Stream.of("list one", "list two", "list three", "list four", "list five")
);
}
@Test
public void testIterator() {
Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();
completeFirstTwoElements(
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
);
}
private void completeFirstTwoElements(Stream<String> stream) {
stream
.map(this::cf)
.limit(2)
.parallel()
.forEach(cf -> {
try {
System.out.println(cf.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
private CompletableFuture<String> cf(String result) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Running " + result);
return result;
});
}
}
输出是:
Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one
该testList
方法按预期工作。's 仅在最后CompletableFuture
评估,因此在 limit 方法之后仅保留前两项。
然而,这个testIterator
方法是出乎意料的。所有CompletableFuture
的都完成了,限制只在之后完成。
如果我parallel()
从流中删除该方法,它会按预期工作。但是,处理(forEach()
)应该并行完成,因为在我的完整程序中它是一个长时间运行的方法。
任何人都可以解释为什么会这样吗?
看起来这取决于 Java 版本,所以我使用的是 1.8:
$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)