7

我有以下代码无法按预期工作(跳过了随机行,而不是第一行):

Files.lines(path)
     .skip(1)
     .parallel()
     .forEach( System.out::println )

我有一种感觉,我误解了 Streams 的行为。问题是:我可以先将流视为顺序的(并使用“有状态的中间操作”),然后将其输入并行forEach吗?

4

4 回答 4

2

整个流水线要么是并行的,要么是顺序的。

尝试使用forEachOrdered而不是forEach. 在我的测试中,如果forEachOrdered使用它会跳过第一行(forEach它会跳过最后一行)。

forEach忽略遇到顺序,似乎也可以使其他操作忽略它。

于 2013-12-19T10:21:29.367 回答
1

不,你不能那样做。但是,您的代码应该可以按预期工作,来自Stream.skipjavadocs

虽然 skip() 在顺序流管道上通常是一种便宜的操作,但在有序并行管道上可能会非常昂贵,尤其是对于较大的 n 值,因为 skip(n) 被限制为不仅跳过任何 n 个元素,而且跳过第一个 n相遇顺序中的元素。如果您的情况的语义允许,使用无序流源(例如 generate(Supplier))或使用 BaseStream.unordered() 删除排序约束可能会导致并行管道中的 skip() 显着加速。如果需要与遇到顺序保持一致,并且您在并行管道中使用 skip() 时遇到性能或内存利用率不佳的情况,则使用 BaseStream.sequential() 切换到顺序执行可能会提高性能。

您的代码是否有效取决于返回的流的性质Files.lines(..),这取决于该流是否有效Ordered。这些特征由使用的 Spliterator 设置,如果流是有序的,那么它将始终跳过第一个元素。如果流是无序的,那么它将跳过一个元素。

http://download.java.net/jdk8/docs/api/java/util/Spliterator.html

于 2013-12-17T17:58:57.637 回答
1

这不是一个错误,而是一个功能。调用parallel()使整个流并行。除非进行后续调用,否则会将sequential()整个流设置回顺序模式。

javaodoc说:

返回一个等效的并行流。

于 2013-12-17T17:06:23.433 回答
0

似乎 skip(n) 不会跳过并行流上的前 n 个元素。

解决方案,使用 BufferedReader readLine() 方法将前 [n] 行切掉。

然后获取 Stream ,它将在您离开阅读器的地方继续:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.stream.IntStream;

public class TestStreams {

    public static void main(String[] args) throws Exception{
         unordered();
    }

    public static void unordered() throws IOException, InterruptedException {

        StringBuilder sb = new StringBuilder();
        IntStream.range(0, 1000).forEach(n -> sb.append(n).append("\n"));

        try (BufferedReader br = new BufferedReader(new StringReader(sb.toString()))) {
            if (br.readLine() != null) {
                br.lines()
                        .parallel()
                        .forEach(it -> System.out.println(Thread.currentThread() + " : " + it));
            }
        }
    }  
}
于 2013-12-20T17:12:59.950 回答