0

假设我有一个看起来像这样的可观察对象(这是 Python,但应该对所有语言都通用):

rx.Observable.from_iterable([[1],[],[2],[3],[],[],[4],[5,6],[7],[8,9],[10]])

我希望最终能够将整数批处理成长度为 5 的列表,并能够将它们传递给函数,所以是这样的:

batch_function([1,2,3,4,5])
batch_function([6,7,8,9,10])

实际上,传入的数据将是(可能为空的)列表的无限流。我只是想确保batch_function在累积 5 个实际值之前不会进行后续调用。谢谢你的帮助。

4

1 回答 1

0

以下代码段对我有用,使用buffer_with_count. 不过,我不确定是否有更简洁的方法来做到这一点,即没有flat_map.

BUFFER_COUNT=5
rx.Observable.from_iterable(iter(get_items())) \
  .flat_map(lambda it: it) \
  .buffer_with_count(BUFFER_COUNT) \
  .map(lambda my_array: do_something_with(my_array)) \
  .subscribe(lambda it: print(it))
于 2017-08-31T18:54:24.207 回答