我一直在寻找有关如何在 rx 中使用 Observable.Buffer 的示例,但找不到比样板时间缓冲的东西更重要的东西。
似乎确实存在指定“bufferClosingSelector”的重载,但我无法理解它。
我要做的是创建一个按时间或“累积”缓冲的序列。考虑一个请求流,其中每个请求都有某种权重,我不想一次处理超过 x 个累积权重,或者如果累积的权重不够,就给我最后一个时间范围内的内容(常规缓冲区功能)
我一直在寻找有关如何在 rx 中使用 Observable.Buffer 的示例,但找不到比样板时间缓冲的东西更重要的东西。
似乎确实存在指定“bufferClosingSelector”的重载,但我无法理解它。
我要做的是创建一个按时间或“累积”缓冲的序列。考虑一个请求流,其中每个请求都有某种权重,我不想一次处理超过 x 个累积权重,或者如果累积的权重不够,就给我最后一个时间范围内的内容(常规缓冲区功能)
bufferClosingSelector
是一个每次调用来获取一个 Observable 的函数,它会在缓冲区被关闭时产生一个值。
例如,
source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1)))
像常规Buffer(time)
过载一样工作。
如果您想对序列进行加权,您可以Scan
在序列上应用 a ,然后决定您的聚合条件。
例如,source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
给你一个序列,当源序列加起来超过 100 时产生一个值。
您可以使用Amb
这两个关闭条件来竞争,以查看哪个首先反应:
.Buffer(() => Observable.Amb
(
Observable.Timer(TimeSpan.FromSeconds(1)),
source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
)
)
您可以使用任何一系列组合器,这些组合器为要在该点关闭的缓冲区生成任何值。
注意:
给关闭选择器的值无关紧要 - 重要的是通知。所以要结合不同类型的来源,Amb
只需将其更改为System.Reactive.Unit
.
Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())