5

在第一个关于深入理解 Java 流拆分器的问题之后另一个关于流的微妙问题:为什么.flatMap()Java 中的实现效率如此低下(非懒惰)?

通常流应该尽可能地懒惰,但.flatMap()方法不是。

例如:

stream.flatMap(this::getStreamWith10HeavyComputationElems).firstFirst()在返回第一个重计算结果之前将消耗 10 个元素(10 个重计算)。

stream.flatMap(this::getStreamWith10HeavyComputationElems).limit(11).count()在返回 11 之前将消耗 20 个元素(2x10 繁重的计算)。

问题是为什么 Java使用非惰性实现?

    @Test
    void flatMap_native() throws Exception {
        AtomicInteger count = new AtomicInteger();
        Stream<Long> stream = LongStream.range(0, 5).boxed()
                .flatMap(num -> LongStream.range(0, 10).boxed()
                                    .peek(x -> count.incrementAndGet()))
                .limit(11);

        assertThat(stream).hasSize(11);
        assertThat(count).hasValue(20); //!why? - should be 11!
    }

作为解决方法,我创建了自己的 flatMap 实现,但与 native 调用相比它缺乏流畅性:flatMap(stream, mapper)vs native stream.flatMap(mapper)

public static <T, R> Stream<R> flatMap(Stream<? extends T> stream, Function<? super T, ? extends Stream<? extends R>> mapper) {
    // Outside the class to be able to close it, starts with stream.empty
    AtomicReference<Stream<? extends R>> flatMapStreamRef = new AtomicReference<>(Stream.empty());

    // Defining a better spliterator than the native flatMap one.
    class FlatMapSpliterator implements Spliterator<R> {
        private final AtomicReference<T> item = new AtomicReference<>();
        private final Spliterator<? extends T> spliterator;
        private Stream<? extends R> flatMapStream = flatMapStreamRef.get();
        private Spliterator<? extends R> flatMapSpliterator = flatMapStream.spliterator();

        private FlatMapSpliterator(Spliterator<? extends T> spliterator) {
            this.spliterator = spliterator;
        }

        @Override
        public boolean tryAdvance(Consumer<? super R> action) {
            while(true) {
                if (flatMapSpliterator.tryAdvance(action)) {
                    return true;
                }
                if (!spliterator.tryAdvance(item::set)) {
                    return false; // nothing more to process
                }
                Stream<? extends R> stream = mapper.apply(item.get());
                if(stream != null) {
                    flatMapStream.close();
                    flatMapStream = stream;
                    flatMapStreamRef.set(stream);
                    flatMapSpliterator = flatMapStream.spliterator();
                }
            }
        }

        @Override
        @SuppressWarnings("unchecked")
        public Spliterator<R> trySplit() {
            Spliterator<? extends R> subFlatMapSpliterator = flatMapSpliterator.trySplit();
            if(subFlatMapSpliterator != null) {
                return (Spliterator<R>) subFlatMapSpliterator;
            }

            Spliterator<? extends T> subSpliterator = spliterator.trySplit();
            if(subSpliterator == null) {
                return null;
            }

            return new FlatMapSpliterator(subSpliterator);
        }

        @Override
        public long estimateSize() {
            // If both estimate size are Long.MAX_VALUE then math overflow will happen
            long estimateSize = spliterator.estimateSize() + flatMapSpliterator.estimateSize();
            return estimateSize < 0 ? Long.MAX_VALUE : estimateSize;
        }

        @Override
        public int characteristics() {
            // Maintain only ORDERED (used by native flatMap)
            return spliterator.characteristics() & ORDERED;
        }
    }

    return StreamSupport.stream(new FlatMapSpliterator(stream.spliterator()), stream.isParallel())
            .onClose(stream::close)
            .onClose(flatMapStreamRef.get()::close);
}
4

0 回答 0