我有一个 Rx 流,它是来自特定组件的传出更改源。
有时我希望能够在流上进入批处理模式,以便传递给onNext的项目累积并仅在退出批处理模式时传递。
通常我会通过流传递单个项目:
stream.onNext(1);
stream.onNext(2);
在传递给onNext的项目和在subscribe中接收的项目之间存在一对一的映射,因此前面的代码片段导致两次调用subscribe的值分别为 1 和 2。
我正在寻找的批处理模式可能会像这样工作:
stream.enterBatchMode();
stream.onNext(1);
stream.onNext(2);
stream.exitBatchMode();
在这种情况下,我希望仅使用单个串联数组 [1, 2] 调用一次订阅。
重申一下,我有时只需要批处理模式,有时我会传递未连接的项目。
如何使用 Rx 实现这种行为?
注意:以前我通过onNext传递数组,尽管这主要是为了使在单独模式和批处理模式下类型保持不变。
例如:
stream.onNext([1, 2, 3]);
stream.onNext([4, 5, 6]);
订阅接收 [1, 2, 3] 然后 [4, 5, 6],但是在批处理模式下订阅接收连接结果 [1, 2, 3, 4, 5, 6]。
因此,当您以这种方式看待它时,它更像是unconcatenated和concatenated模式(而不是individual和batch-mode)。但我认为问题非常相似。