我正在使用 Rx(反应式框架)实现总线,到目前为止它看起来不错。我现在面临的问题是如何在流开始之前添加事件。
为了提供更多上下文,这是针对事件源(a la CQRS/ES)。当总线启动时,一些订阅者会得到IObservable。此时,总线会询问订阅者他们需要从什么事件编号/顺序开始。事件从最低编号开始从磁盘加载并添加到流中,每个订阅者使用 where 语句从正确的事件开始。随着应用程序的运行,新事件被添加并发送给订阅者。这部分效果很好。
一些迟到的订阅者,但仍需要所有事件。看起来ReplaySubject符合要求,只要事件的数量足够小。我在想我可能可以做一些磁盘/离线缓存来保留更多它们(欢迎任何指针!)。
更大的问题是,当一些订阅者获取 IObservable 时,他们需要获取在最初加载的事件之前发生的事件。像下面这样的情况。
当公共汽车启动时,它会向前加载事件#50(最早的人想要的)。现在,一个新的订阅者请求 IObservable,除非他们需要从 #20 开始。所以我想做的是加载 20-49,并在流开始之前添加它们。现有订阅者都不应看到任何事件(它将被 Where 过滤)。
看起来这应该是可能的,但我无法完全理解它。这在 Rx 中可能吗?如果是这样,怎么做?
谢谢!埃里克