我希望能够处理从必须在页面中访问的源读取的 java 流。作为第一种方法,我实现了一个分页迭代器,它仅在当前页面用完项目时请求页面,然后用于StreamSupport.stream(iterator, false)
获取迭代器的流句柄。
因为我发现我的页面获取起来非常昂贵,所以我想通过并行流来访问页面。在这一点上,我发现由于 java 直接从迭代器提供的拆分器实现,我的幼稚方法提供的并行性是不存在的。因为我实际上对我想遍历的元素了解很多(我知道请求第一页后的总结果数,并且源支持偏移和限制)我认为应该可以实现我自己的拆分器来实现真正的并发性(在页面元素上完成的工作和页面查询中)。
我已经能够很容易地实现“在元素上完成的工作”并发,但在我最初的实现中,页面的查询只由最顶层的拆分器完成,因此不会从工作分工中受益由 fork-join 实现提供。
我怎样才能编写一个实现这两个目标的拆分器?
作为参考,我将提供到目前为止我所做的事情(我知道它没有适当地划分查询)。
public final class PagingSourceSpliterator<T> implements Spliterator<T> {
public static final long DEFAULT_PAGE_SIZE = 100;
private Page<T> result;
private Iterator<T> results;
private boolean needsReset = false;
private final PageProducer<T> generator;
private long offset = 0L;
private long limit = DEFAULT_PAGE_SIZE;
public PagingSourceSpliterator(PageProducer<T> generator) {
this.generator = generator;
}
public PagingSourceSpliterator(long pageSize, PageProducer<T> generator) {
this.generator = generator;
this.limit = pageSize;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (hasAnotherElement()) {
if (!results.hasNext()) {
loadPageAndPrepareNextPaging();
}
if (results.hasNext()) {
action.accept(results.next());
return true;
}
}
return false;
}
@Override
public Spliterator<T> trySplit() {
// if we know there's another page, go ahead and hand off whatever
// remains of this spliterator as a new spliterator for other
// threads to work on, and then mark that next time something is
// requested from this spliterator it needs to be reset to the head
// of the next page
if (hasAnotherPage()) {
Spliterator<T> other = result.getPage().spliterator();
needsReset = true;
return other;
} else {
return null;
}
}
@Override
public long estimateSize() {
if(limit == 0) {
return 0;
}
ensureStateIsUpToDateEnoughToAnswerInquiries();
return result.getTotalResults();
}
@Override
public int characteristics() {
return IMMUTABLE | ORDERED | DISTINCT | NONNULL | SIZED | SUBSIZED;
}
private boolean hasAnotherElement() {
ensureStateIsUpToDateEnoughToAnswerInquiries();
return isBound() && (results.hasNext() || hasAnotherPage());
}
private boolean hasAnotherPage() {
ensureStateIsUpToDateEnoughToAnswerInquiries();
return isBound() && (result.getTotalResults() > offset);
}
private boolean isBound() {
return Objects.nonNull(results) && Objects.nonNull(result);
}
private void ensureStateIsUpToDateEnoughToAnswerInquiries() {
ensureBound();
ensureResetIfNecessary();
}
private void ensureBound() {
if (!isBound()) {
loadPageAndPrepareNextPaging();
}
}
private void ensureResetIfNecessary() {
if(needsReset) {
loadPageAndPrepareNextPaging();
needsReset = false;
}
}
private void loadPageAndPrepareNextPaging() {
// keep track of the overall result so that we can reference the original list and total size
this.result = generator.apply(offset, limit);
// make sure that the iterator we use to traverse a single page removes
// results from the underlying list as we go so that we can simply pass
// off the list spliterator for the trySplit rather than constructing a
// new kind of spliterator for what remains.
this.results = new DelegatingIterator<T>(result.getPage().listIterator()) {
@Override
public T next() {
T next = super.next();
this.remove();
return next;
}
};
// update the paging for the next request and inquiries prior to the next request
// we use the page of the actual result set instead of the limit in case the limit
// was not respected exactly.
this.offset += result.getPage().size();
}
public static class DelegatingIterator<T> implements Iterator<T> {
private final Iterator<T> iterator;
public DelegatingIterator(Iterator<T> iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return iterator.next();
}
@Override
public void remove() {
iterator.remove();
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
iterator.forEachRemaining(action);
}
}
}
还有我的网页来源:
public interface PageProducer<T> extends BiFunction<Long, Long, Page<T>> {
}
还有一个页面:
public final class Page<T> {
private long totalResults;
private final List<T> page = new ArrayList<>();
public long getTotalResults() {
return totalResults;
}
public List<T> getPage() {
return page;
}
public Page setTotalResults(long totalResults) {
this.totalResults = totalResults;
return this;
}
public Page setPage(List<T> results) {
this.page.clear();
this.page.addAll(results);
return this;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Page)) {
return false;
}
Page<?> page1 = (Page<?>) o;
return totalResults == page1.totalResults && Objects.equals(page, page1.page);
}
@Override
public int hashCode() {
return Objects.hash(totalResults, page);
}
}
以及获取具有“慢”分页以进行测试的流的示例
private <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {
PageProducer<T> producer = (offset, limit) -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int beginIndex = offset.intValue();
int endIndex = Math.min(offset.intValue() + limit.intValue(), things.size());
return new Page<T>().setTotalResults(things.size())
.setPage(things.subList(beginIndex, endIndex));
};
return StreamSupport.stream(new PagingSourceSpliterator<>(pageSize, producer), true);
}