假设您有一个 Observable 值流,它可以非常快速地推送值。
现在假设您有一个订阅者需要使用来自该可观察流的最新值来更新数据库中的记录,即有点慢的 I/O 绑定消费者。
换句话说,想象
from rx import Observable
obs = Observable.interval(1)
# Whoops. Observable will push values faster than we can consume them here.
sub = obs.subscribe( do_some_io_bound_operation )
除了可观察对象在不同的 on_next 调用之间创建的最新值之外,还有什么方法可以“跳过”所有内容?
换句话说:假设在上面的示例中,obs 开始推送值 1,2,3... 订阅者调用值为“1”的 do_some_io_bound_operation。这需要一段时间 - 完成时,值 2 和 3 可用。但是,与其对两个新的可用值都调用 do_some_io_bound_operation,理想情况下,订阅者应该跳过值“2”并直接移动到值 3。
有点难以描述 - 我希望意图很明确。有什么办法可以做到这一点?
我想 .Buffer() 进入这个方向 - 但我见过的大多数应用程序只是缓冲一些固定数量的元素或时间跨度,而我需要动态缓冲(缓冲在执行 on_next 时发生的所有事情)
谢谢