我有以下代码无法按预期工作(跳过了随机行,而不是第一行):
Files.lines(path)
.skip(1)
.parallel()
.forEach( System.out::println )
我有一种感觉,我误解了 Streams 的行为。问题是:我可以先将流视为顺序的(并使用“有状态的中间操作”),然后将其输入并行forEach
吗?
我有以下代码无法按预期工作(跳过了随机行,而不是第一行):
Files.lines(path)
.skip(1)
.parallel()
.forEach( System.out::println )
我有一种感觉,我误解了 Streams 的行为。问题是:我可以先将流视为顺序的(并使用“有状态的中间操作”),然后将其输入并行forEach
吗?
整个流水线要么是并行的,要么是顺序的。
尝试使用forEachOrdered
而不是forEach
. 在我的测试中,如果forEachOrdered
使用它会跳过第一行(forEach
它会跳过最后一行)。
forEach
忽略遇到顺序,似乎也可以使其他操作忽略它。
不,你不能那样做。但是,您的代码应该可以按预期工作,来自Stream.skip
javadocs
虽然 skip() 在顺序流管道上通常是一种便宜的操作,但在有序并行管道上可能会非常昂贵,尤其是对于较大的 n 值,因为 skip(n) 被限制为不仅跳过任何 n 个元素,而且跳过第一个 n相遇顺序中的元素。如果您的情况的语义允许,使用无序流源(例如 generate(Supplier))或使用 BaseStream.unordered() 删除排序约束可能会导致并行管道中的 skip() 显着加速。如果需要与遇到顺序保持一致,并且您在并行管道中使用 skip() 时遇到性能或内存利用率不佳的情况,则使用 BaseStream.sequential() 切换到顺序执行可能会提高性能。
您的代码是否有效取决于返回的流的性质Files.lines(..)
,这取决于该流是否有效Ordered
。这些特征由使用的 Spliterator 设置,如果流是有序的,那么它将始终跳过第一个元素。如果流是无序的,那么它将跳过一个元素。
http://download.java.net/jdk8/docs/api/java/util/Spliterator.html
似乎 skip(n) 不会跳过并行流上的前 n 个元素。
解决方案,使用 BufferedReader readLine() 方法将前 [n] 行切掉。
然后获取 Stream ,它将在您离开阅读器的地方继续:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.stream.IntStream;
public class TestStreams {
public static void main(String[] args) throws Exception{
unordered();
}
public static void unordered() throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
IntStream.range(0, 1000).forEach(n -> sb.append(n).append("\n"));
try (BufferedReader br = new BufferedReader(new StringReader(sb.toString()))) {
if (br.readLine() != null) {
br.lines()
.parallel()
.forEach(it -> System.out.println(Thread.currentThread() + " : " + it));
}
}
}
}