7

我一直在寻找有关如何在 rx 中使用 Observable.Buffer 的示例,但找不到比样板时间缓冲的东西更重要的东西。

似乎确实存在指定“bufferClosingSelector”的重载,但我无法理解它。

我要做的是创建一个按时间或“累积”缓冲的序列。考虑一个请求流,其中每个请求都有某种权重,我不想一次处理超过 x 个累积权重,或者如果累积的权重不够,就给我最后一个时间范围内的内容(常规缓冲区功能)

4

1 回答 1

15

bufferClosingSelector是一个每次调用来获取一个 Observable 的函数,它会在缓冲区被关闭时产生一个值。

例如,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1)))像常规Buffer(time)过载一样工作。

如果您想对序列进行加权,您可以Scan在序列上应用 a ,然后决定您的聚合条件。

例如,source.Scan((a,c) => a + c).SkipWhile(a => a < 100)给你一个序列,当源序列加起来超过 100 时产生一个值。

您可以使用Amb这两个关闭条件来竞争,以查看哪个首先反应:

        .Buffer(() => Observable.Amb
                     (
                          Observable.Timer(TimeSpan.FromSeconds(1)), 
                          source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
                     )
               )

您可以使用任何一系列组合器,这些组合器为要在该点关闭的缓冲区生成任何值。

注意: 给关闭选择器的值无关紧要 - 重要的是通知。所以要结合不同类型的来源,Amb只需将其更改为System.Reactive.Unit.

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
于 2012-03-05T21:34:29.437 回答