0

我正在使用 Rx(反应式框架)实现总线,到目前为止它看起来不错。我现在面临的问题是如何在流开始之前添加事件。

为了提供更多上下文,这是针对事件源(a la CQRS/ES)。当总线启动时,一些订阅者会得到IObservable。此时,总线会询问订阅者他们需要从什么事件编号/顺序开始。事件从最低编号开始从磁盘加载并添加到流中,每个订阅者使用 where 语句从正确的事件开始。随着应用程序的运行,新事件被添加并发送给订阅者。这部分效果很好。

一些迟到的订阅者,但仍需要所有事件。看起来ReplaySubject符合要求,只要事件的数量足够小。我在想我可能可以做一些磁盘/离线缓存来保留更多它们(欢迎任何指针!)。

更大的问题是,当一些订阅者获取 IObservable 时,他们需要获取在最初加载的事件之前发生的事件。像下面这样的情况。

当公共汽车启动时,它会向前加载事件#50(最早的人想要的)。现在,一个新的订阅者请求 IObservable,除非他们需要从 #20 开始。所以我想做的是加载 20-49,并在流开始之前添加它们。现有订阅者都不应看到任何事件(它将被 Where 过滤)。

看起来这应该是可能的,但我无法完全理解它。这在 Rx 中可能吗?如果是这样,怎么做?

谢谢!埃里克

4

2 回答 2

3

我会以一种非常简单的 Rx 方式来看待这个问题。不要使用 a ReplaySubject,对所有“实时”事件使用单个Subject,并为每个订阅者连接前面的历史事件。

我想你可能有某种数据结构来跟踪历史记录。如果您已经在内存中加载/捕获了 #50 到 #80 的事件,那么当一个新的订阅者出现请求来自 #20 的事件时,我会做这样的事情:

var fromIndex = 20;
var eventSubject = new Subject<Event>();
capturedEvents
    .GetFromIndex(fromIndex) // Gets a enumerable list of events
    .ToObservable()
    .Concat(eventSubject)
    .Subscribe( ... );

然后,您可以eventSubject使用 来捕获新事件,如下所示:

eventSubject.Subscribe(capturedEvents.CaptureEvent);

这是否满足您的需求?

于 2012-05-12T07:19:32.490 回答
2

怎么样:

itemsFromTwentyToFourtyNine.Concat(currentItems);
于 2012-05-12T06:00:41.467 回答