4

我想从 JPA 存储库创建一个流。目标是将来自回购的实体(可能超过一百万)映射到其他实体,这些实体又将存储在另一个回购中。

到目前为止,我构建了一个收集器,它将收集给定数量(例如 1000 个)实体,然后将它们存储到目标存储库中。这将在并行流中工作。我现在需要的是一种从源 Repo 获取实体并在需要时将它们输入 Stream 的好方法。

到目前为止最有希望的是实现一个供应商(http://docs.oracle.com/javase/8/docs/api/java/util/function/Supplier.html)来通过生成构建流,但我没有找到当对源 Repo 的查询不提供另一个实体时终止进程的方法。

任何指针?

4

4 回答 4

3

我们最近在 Spring Data Fowler Release Train 的最新 RC1 版本中在 Spring Data JPA(和 MongoDB)中添加了对此的支持。

委托默认方法返回的 Stream 示例 “真实流” Stream 的示例

于 2015-03-09T08:31:36.647 回答
2
于 2014-09-18T11:49:07.157 回答
0

一个简单的例子可能是:

  @Repository
  public class MyEntityRepository extends CrudRepository<MyEntity, Long> {           
  }

  @Component
  public class MyEntityService {

       @Autowired
       private MyEntityRepository myEntityRepository;


       public void() {
            // if the findAll() method returns List
            Stream<MyEntity> streamFromList = myEntityRepository.findAll().stream();


            // if the findAll() method returns Iterable
            Stream<MyEntity> streamFromIterable = StreamSupport.stream(myEntityRepository.findAll().spliterator(), true);

       }
  } 
于 2014-09-17T19:51:26.660 回答
0

好的,感谢所有的贡献。我结合了所说的并实施了我需要的。也许实施会澄清我想从什么开始。

我创建了两个类,RepositryCollectorRepositorySpliterator

public class RepositoryCollector<T> implements Collector<T, Tuple2<Integer,List<T>>, Integer>{

    private JpaRepository<T, ?> repository;
    private int threshold;

    public BinaryOperator<Tuple2<Integer, List<T>>> combiner() {
        return (listTuple, itemsTuple) -> {
            List<T> list = listTuple._2;
            List<T> items = itemsTuple._2;
            list.addAll(items);
            int sum = listTuple._1 + itemsTuple._1;
            if(list.size() >= this.threshold){
                this.repository.save(list);
                this.repository.flush();
                list = new LinkedList<>();
            }
            return new Tuple2<>(sum, list);
        };
    }
}

我省略了收集器所需的其他功能,因为所有相关信息都存在于组合器中。Spliterator 也是如此。

public class RepositorySpliterator<T> implements Spliterator<T> {

    private Slice<T> slice;
    private Function<Pageable, Slice<T>> getSlice;
    private Iterator<T> sliceIterator;

    public RepositorySpliterator(Pageable pageable, Function<Pageable, Slice<T>> getSlice) {
        this.getSlice = getSlice;
        this.slice = this.getSlice.apply(pageable);
        this.sliceIterator = slice.iterator();
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if(sliceIterator.hasNext()) {
            action.accept(sliceIterator.next());
            return true;
        } else if (slice.hasNext()) {
            this.slice = getSlice.apply(slice.nextPageable());
            this.sliceIterator = this.slice.iterator();
            if(sliceIterator.hasNext()){
                action.accept(sliceIterator.next());
                return true;
            }
        }
        return false;
    }

    public Stream<T> getStream(boolean parallel){
        return StreamSupport.stream(this, parallel);
    }
}

如您所见,我放入了一个辅助函数来生成我需要的 Stream。也许这有点草率但是......嗯。

所以现在我只需要在我的映射类中添加几行代码就可以了。

    public void start(Timestamp startTimestamp, Timestamp endTimestamp) {
        new RepositorySpliterator<>(
                new PageRequest(0, 10000), pageable -> sourceRepository.findAllBetween(startTimestamp, endTimestamp, pageable))
                .getStream(true)
                .map(entity -> mapToTarget(endTimestamp, entity))
                .collect(new RepositoryCollector<>(targetRepository, 1000));
    }

映射器将从源中获取 10000 个实体,将它们倒入流池中,以便对其进行映射和存储。每当其中一个流用完新实体时,将获取新批次并将其馈送到同一流池中。

如果我的实施中有明显的错误,请随时发表评论和改进!

于 2014-09-22T09:42:56.683 回答