3

I'm a little confused by all my research. I have custom interface called TabularResultSet (which I've watered down for the sake of example) which traverses through any data set that is tabular in nature. It has a next() method like an iterator and it can be looping through a QueryResultSet, a tabbed-table from a clipboard, a CSV, etc...

However, I'm trying to create a Spliterator that wraps around my TabularResultSet and easily turns it into a stream. I cannot imagine a safe way to parallelize because the TabularResultSet could be traversing a QueryResultSet, and calling next() concurrently could wreak havoc. The only way I imagine parallelization can be done safely is to have the next() called by a single working thread and it passes the data off to a parallel thread to work on it.

So I think parallelization is not an easy option. How do I just get this thing to stream without parallelizing? Here is my work so far...

public final class SpliteratorTest {

    public static void main(String[] args) {
       TabularResultSet rs = null; /* instantiate an implementation; */

       Stream<TabularResultSet> rsStream = StreamSupport.stream(new TabularSpliterator(rs), false);
    }

    public static interface TabularResultSet {
        public boolean next();

        public List<Object> getData();
    }

    private static final class TabularSpliterator implements Spliterator<TabularResultSet> {

        private final TabularResultSet rs;

        public TabularSpliterator(TabularResultSet rs) {
            this.rs = rs;
        }
        @Override
        public boolean tryAdvance(Consumer<? super TabularResultSet> action) {
            action.accept(rs);
            return rs.next();
        }

        @Override
        public Spliterator<TabularResultSet> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }
}
4

2 回答 2

5

扩展它可能是最容易的Spliterators.AbstractSpliterator。如果你这样做,你只需要实现tryAdvance. 这可以变成并行流;并行性来自tryAdvance多次调用流实现,将接收到的数据进行批处理,并在不同的线程中进行处理。

如果TabularResultSet是 JDBC 之类的东西ResultSet,我认为您不需要 aSpliterator<TabularResultSet>Stream<TabularResultSet>. 相反,它看起来 aTabularResultSet代表整个表格数据集,因此您可能希望每个拆分器或流元素代表该表中的一行 -List<Object>getData()? 如果是这样,您需要类似以下内容。

class TabularSpliterator extends Spliterators.AbstractSpliterator<List<Object>> {
    private final TabularResultSet rs;

    public TabularSpliterator(TabularResultSet rs) {
        super(...);
        this.rs = rs;
    }

    @Override public boolean tryAdvance(Consumer<? super List<Object>> action) {
        if (rs.next()) {
            action.accept(rs.getData());
            return true;
        } else {
            return false;
        }
    }
}

然后,您可以通过调用将此拆分器的实例转换为流StreamSupport.stream()

注意:通常,Spliterator 实例不会从多个线程中调用,甚至不需要是线程安全的。有关详细信息,请参阅“尽管...”开头的段落中的Spliterator 类文档。

于 2015-03-09T05:14:17.647 回答
1

你大部分时间都在。您现在要做的就是将 Spliterator 转换为 Stream。您可以使用StreamSupport.stream(Spliterator, boolean)方法来做到这一点。布尔参数是您是否要进行并行流式处理的标志(您需要 false,因为不并行)

如果您的 TabularResultSet 实现了 Iterator,您可以使用该Spliterators.spliteratorUnknownSize()方法将 Iterator 转换为 Spliterator,它基本上可以完成您上面的代码所做的事情。

不确定是否值得添加特征,但您可能需要考虑 Spliterator.IMMUTABLE| Spliterator.ORDERED | Spliterator.NONNULL

祝你好运

于 2015-03-09T01:23:07.087 回答