6

This is a classic implementation of an immutable linked list:

public abstract class List<A> implements Iterable<A> {
    private static final List NIL = new Nil();

    public abstract A head();
    public abstract List<A> tail();
    public List<A> cons(A a) { return new Cons<>(a, this); }

    public static <A> List<A> nil() { return NIL; }

    @Override
    public Iterator<A> iterator() {
        return new Iterator<A>() {
            private List<A> list = List.this;

            @Override
            public boolean hasNext() {
                return list != NIL;
            }

            @Override
            public A next() {
                A n = list.head();
                list = list.tail();
                return n;
            }
        };
    }

    public Stream<A> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

    public Stream<A> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }
}

class Nil extends List {
    @Override public Object head() { throw new NoSuchElementException(); }
    @Override public List tail() { throw new NoSuchElementException(); }
}

class Cons<A> extends List<A> {
    private final A head;
    private final List<A> tail;

    Cons(A head, List<A> tail) {
        this.head = head;
        this.tail = tail;
    }

    @Override public A head() { return head; }
    @Override public List<A> tail() { return tail; }
}

The default implementation of spliterator() does not support efficient parallelizing:

List<Integer> list = List.<Integer> nil().cons(3).cons(2).cons(1);

list.parallelStream().forEach(i -> {
    System.out.println(i);
    try {
        Thread.sleep(1000);
    } catch (Exception e) {
        e.printStackTrace();
    }
});

This will print 1, 2, 3 sequentially.

How to implement spliterator() to support efficient parallelizing?

4

2 回答 2

3

甚至无法报告估计大小(这是 的默认实现Iterable)的拆分器被并行管道很好地拆分。如果您跟踪List. 在您的情况下,跟踪确切大小并不难:

public abstract class List<A> implements Iterable<A> {
    ...
    public abstract long size();

    @Override
    public Spliterator<A> spliterator() {
        return Spliterators.spliterator(iterator(), size(), Spliterator.ORDERED);
    }
}

class Nil extends List {
    ...
    public long size() {
        return 0;
    }
}

class Cons<A> extends List<A> {
    ...
    private final long size;

    Cons(A head, List<A> tail) {
        this.head = head;
        this.tail = tail;
        this.size = tail.size()+1;
    }

    ...

    @Override
    public long size() {
        return size;
    }
}

之后并行化会更好。请注意,它仍然是较差的人并行化,因为您无法快速跳转到列表的中间,但在许多情况下它会提供合理的加速。

另请注意,最好明确指定Spliterator.ORDERED特征。否则,即使明确请求(例如,通过forEachOrdered()终端操作),并行流操作中也可能会忽略该顺序。

于 2015-08-17T08:10:36.610 回答
1

您可能会使用一些具有交错的算法- 例如,计算元素并在整数除法后使用余数。这可以拆分元素以进行并行迭代。

您也可以在构建迭代器之前迭代一次,将列表拆分为interval,但这会超出 stream 的目的 - 例如,如果您将其用于anyMatch,它会减慢您的速度。

没有真正有效的方法来拆分链表(在少于线性的时间内),除非您创建自己的链表实现,其中包含一些附加信息。

编辑:哦等等 - 你只实现Iterable. 这是相当有限的,你必须想出一个只有一次通过的算法。这意味着,拆分本身根本不会并行,因此您不妨在流程的其他地方强制执行并行性。

于 2015-08-17T07:49:47.167 回答