3

使用 cyclops-react 1.0.0-RC3,我尝试使用批处理重新创建 cyclops-react流用户指南中的示例。我发现其中缺少一些方法ReactiveSeq,包括batchBySizewindowByTime

我确实找到了这些方法StreamUtils并且它们按预期工作,但看起来不像用户指南中的示例那么光滑......

从用户指南...

// Example 19. Batch by size example
ReactiveSeq.of(1,2,3,4,5, 6)
  .map(n-> n==6? sleep(1) : n)
  .batchBySize(4) // this function seems to be missing...
  .toList()

我能得到什么工作......

import com.aol.cyclops.control.ReactiveSeq;
// ...
StreamUtils.batchBySize(
    ReactiveSeq.of(1, 2, 3, 4, 5, 6)
        .map(n -> TestUtils.mayBeSlow(n)),
    4)
    .collect(Collectors.toList());

testBatchingSlidingWindowing您可以在方法测试类StreamsTest.java中的工作 JUnit 中查看我的代码

我应该期望找到batchBySizeand windowByTimeonReactiveSeq还是使用StreamUtils适当的方式?

4

1 回答 1

3

改为使用分组。它适用于所有 cyclops-react Traversable 类型(例如 ListX、SetX、QueueX、DequeX、ReactiveSeq 等)。所以你的例子会变成

ReactiveSeq.of(1,2,3,4,5, 6)
           .map(n-> n==6? sleep(1) : n)
           .grouped(4) 
           .toList()

groupedXXX 运算符的作用类似于 batchByXXX 和 windowByXXX,通过扩展的 Collection 类型提供对分组数据的访问,该类型本身具有所有可遍历和可折叠的运算符。

例如加倍例如组/批处理列表的成员

 ReactiveSeq.of(1,2,3,4,5, 6)
           .map(n-> n==6? sleep(1) : n)
           .grouped(4) 
           .map(list-> list.map(i->i*2))
           .toList() 

您还可以使用返回 ListTransformer 的 groupedT。ListTransformers 允许您像操作嵌套结构一样操作嵌套结构。

例如使用 groupedT 加倍例如组成员/批处理列表

ReactiveSeq.of(1,2,3,4,5, 6)
           .map(n-> n==6? sleep(1) : n)
           .groupedT(4) 
           .map(i->i*2);

并将 ListTransformer 转换回列表流

ListTSeq<Integer> listT = ReactiveSeq.of(1,2,3,4,5, 6)
                                     .map(n-> n==6? sleep(1) : n)
                                     .groupedT(4);

ReactiveSeq<ListX<Integer>> nested = listT.toNestedListX()
                                          .stream(); 
于 2016-05-23T17:32:18.443 回答