0

这个问题是关于rxpy的。

我正在尝试构建一个反应系统来处理来自可观察源的消息。除此之外,我正在尝试将它与基于 zookeeper 的领导选举系统集成。

这种组合将只允许进程群中的一个领导者处理消息流。以下是我正在尝试构建的代码的要点。

# event_source is an observable of messages
# manager.leaders is an observable of leader election events
# manager.followers is an observable of leader relinquish events
event_source\
    .skip_until(manager.leaders)\
    .take_until(manager.followers)\
    .subscribe(observer)

它工作正常,但我需要在两者之间注入skip_until一块take_until来处理回填。这旨在处理领导进程失败与另一个担任领导的进程之间的潜在差距。每条处理过的消息都会留下一条记录,以便新的领导者可以在继续处理流之前赶上丢失的消息(如果有的话)。

我尝试了start_with运算符但没有成功。我不是以一种不适合使用的方式接近它吗?

最终,我正在寻找的解决方案是在由另一个流中的事件触发的流中注入特定数量的项目。

4

1 回答 1

0

那这个呢:

manager.leaders \
    .flat_map(lambda e: event_source
                  .start_with(...)
                  .take_until(manager.followers))

每次manager.leaders发出消息event_source都会被订阅,从注入的项目开始,直到manager.followers发出。

于 2017-04-28T08:26:22.123 回答