0

假设您有一个 Observable 值流,它可以非常快速地推送值。

现在假设您有一个订阅者需要使用来自该可观察流的最新值来更新数据库中的记录,即有点慢的 I/O 绑定消费者。

换句话说,想象

from rx import Observable
obs = Observable.interval(1)

# Whoops. Observable will push values faster than we can consume them here.
sub = obs.subscribe( do_some_io_bound_operation )

除了可观察对象在不同的​​ on_next 调用之间创建的最新值之外,还有什么方法可以“跳过”所有内容?

换句话说:假设在上面的示例中,obs 开始推送值 1,2,3... 订阅者调用值为“1”的 do_some_io_bound_operation。这需要一段时间 - 完成时,值 2 和 3 可用。但是,与其对两个新的可用值都调用 do_some_io_bound_operation,理想情况下,订阅者应该跳过值“2”并直接移动到值 3。

有点难以描述 - 我希望意图很明确。有什么办法可以做到这一点?

我想 .Buffer() 进入这个方向 - 但我见过的大多数应用程序只是缓冲一些固定数量的元素或时间跨度,而我需要动态缓冲(缓冲在执行 on_next 时发生的所有事情)

谢谢

4

1 回答 1

1

不,没有正确的方法。
每当事件发生时, Observable都会通知订阅的 Observer 实例,因此订阅者无法访问所有项目,也不会影响流的生成。
您可以在 Observable 级别执行基于时间的操作,例如去抖动,但您不能在订阅服务器上的工作期间阻止/跳过元素。
您可以实现自己的自定义运算符来跳过消息,但这通常是错误的。

于 2019-08-22T06:46:52.590 回答