好的,感谢所有的贡献。我结合了所说的并实施了我需要的。也许实施会澄清我想从什么开始。
我创建了两个类,RepositryCollector和RepositorySpliterator。
public class RepositoryCollector<T> implements Collector<T, Tuple2<Integer,List<T>>, Integer>{
private JpaRepository<T, ?> repository;
private int threshold;
public BinaryOperator<Tuple2<Integer, List<T>>> combiner() {
return (listTuple, itemsTuple) -> {
List<T> list = listTuple._2;
List<T> items = itemsTuple._2;
list.addAll(items);
int sum = listTuple._1 + itemsTuple._1;
if(list.size() >= this.threshold){
this.repository.save(list);
this.repository.flush();
list = new LinkedList<>();
}
return new Tuple2<>(sum, list);
};
}
}
我省略了收集器所需的其他功能,因为所有相关信息都存在于组合器中。Spliterator 也是如此。
public class RepositorySpliterator<T> implements Spliterator<T> {
private Slice<T> slice;
private Function<Pageable, Slice<T>> getSlice;
private Iterator<T> sliceIterator;
public RepositorySpliterator(Pageable pageable, Function<Pageable, Slice<T>> getSlice) {
this.getSlice = getSlice;
this.slice = this.getSlice.apply(pageable);
this.sliceIterator = slice.iterator();
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if(sliceIterator.hasNext()) {
action.accept(sliceIterator.next());
return true;
} else if (slice.hasNext()) {
this.slice = getSlice.apply(slice.nextPageable());
this.sliceIterator = this.slice.iterator();
if(sliceIterator.hasNext()){
action.accept(sliceIterator.next());
return true;
}
}
return false;
}
public Stream<T> getStream(boolean parallel){
return StreamSupport.stream(this, parallel);
}
}
如您所见,我放入了一个辅助函数来生成我需要的 Stream。也许这有点草率但是......嗯。
所以现在我只需要在我的映射类中添加几行代码就可以了。
public void start(Timestamp startTimestamp, Timestamp endTimestamp) {
new RepositorySpliterator<>(
new PageRequest(0, 10000), pageable -> sourceRepository.findAllBetween(startTimestamp, endTimestamp, pageable))
.getStream(true)
.map(entity -> mapToTarget(endTimestamp, entity))
.collect(new RepositoryCollector<>(targetRepository, 1000));
}
映射器将从源中获取 10000 个实体,将它们倒入流池中,以便对其进行映射和存储。每当其中一个流用完新实体时,将获取新批次并将其馈送到同一流池中。
如果我的实施中有明显的错误,请随时发表评论和改进!