9

我实际上试图回答这个问题如何跳过从 Files.lines 获得的 Stream<String> 的偶数行。所以我虽然这个收集器不能很好地并行工作:

private static Collector<String, ?, List<String>> oddLines() {
    int[] counter = {1};
    return Collector.of(ArrayList::new,
            (l, line) -> {
                if (counter[0] % 2 == 1) l.add(line);
                counter[0]++;
            },
            (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            });
}

但它有效。

编辑:它实际上并没有工作;我被我的输入集太小而无法触发任何并行性这一事实愚弄了。见评论中的讨论

我认为这行不通,因为我想到了以下两个处决计划。


1.counter数组在所有线程之间共享。

线程 t1 读取 Stream 的第一个元素,所以满足 if 条件。它将第一个元素添加到其列表中。然后在他有时间更新数组值之前执行停止。

线程 t2,表示从流的第 4 个元素开始,将其添加到其列表中。所以我们最终得到了一个不需要的元素。

当然,既然这个收集器似乎可以工作,我想它不会那样工作。无论如何,更新都不是原子的。


2.每个Thread都有自己的数组副本

在这种情况下,更新没有更多问题,但没有什么能阻止我线程 t2 不会从流的第 4 个元素开始。所以他也不会那样工作。


所以看起来它根本不像那样工作,这让我想到了一个问题……收集器是如何并行使用的?

有人可以基本上解释一下它是如何工作的,以及为什么我的收集器在并行运行时会工作吗?

非常感谢!

4

2 回答 2

5

parallel()将源流传递到您的收集器就足以破坏逻辑,因为您的共享状态 ( counter)可能会从不同的任务中递增。您可以验证这一点,因为它永远不会为任何有限流输入返回正确的结果:

    Stream<String> lines = IntStream.range(1, 20000).mapToObj(i -> i + "");
    System.out.println(lines.isParallel());
    lines = lines.parallel();
    System.out.println(lines.isParallel());

    List<String> collected = lines.collect(oddLines());

    System.out.println(collected.size());

请注意,对于无限流(例如从 读取时Files.lines()),您需要在流中生成大量数据,因此它实际上分叉了一个任务以同时运行一些块。

我的输出是:

false
true
12386

这显然是错误的。


CONCURRENT正如评论中的@Holger 正确指出的那样,当您的收集器指定and时,可能会发生不同的竞争UNORDERED,在这种情况下,它们在跨任务的单个共享集合上运行(ArrayList::new每个流调用一次),其中只有parallel()它将对每个任务的集合运行累加器,然后使用您定义的组合器组合结果。

如果您将特征添加到收集器,由于单个集合中的共享状态,您可能会遇到以下结果:

false
true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 73
    at java.util.ArrayList.add(ArrayList.java:459)
    at de.jungblut.stuff.StreamPallel.lambda$0(StreamPallel.java:18)
    at de.jungblut.stuff.StreamPallel$$Lambda$3/1044036744.accept(Unknown Source)
    at java.util.stream.ReferencePipeline.lambda$collect$207(ReferencePipeline.java:496)
    at java.util.stream.ReferencePipeline$$Lambda$6/2003749087.accept(Unknown Source)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
    at de.jungblut.stuff.StreamPallel.main(StreamPallel.java:32)12386
于 2015-05-11T15:39:02.637 回答
3

实际上,这个收藏家的工作只是一个巧合。它不适用于自定义数据源。考虑这个例子:

List<String> list = IntStream.range(0, 10).parallel().mapToObj(String::valueOf)
        .collect(oddLines());
System.out.println(list);

这总是产生不同的结果。真正的原因只是因为当BufferedReader.lines()流被至少java.util.Spliterators.IteratorSpliterator.BATCH_UNIT1024行分割时。如果你有更多的行数,它甚至可能会失败BufferedReader

String data = IntStream.range(0, 10000).mapToObj(String::valueOf)
    .collect(Collectors.joining("\n"));
List<String> list = new BufferedReader(new StringReader(data)).lines().parallel()
    .collect(oddLines());
list.stream().mapToInt(Integer::parseInt).filter(x -> x%2 != 0)
    .forEach(System.out::println);

收集器是否正常工作,这不应该打印任何东西。但有时它会打印出来。

于 2015-05-11T15:56:28.897 回答