我一直在尝试创建一个 observable,它从存储库缓存中流式传输世界状态(快照),然后从单独的提要中进行实时更新。问题是快照调用正在阻塞,因此必须在此期间缓冲更新。
这是我想出的,稍微简化了一点。GetStream()方法是我关心的方法。我想知道是否有更优雅的解决方案。假设 GetDataFeed() 全天对缓存进行更新。
private static readonly IConnectableObservable<long> _updateStream;
public static Constructor()
{
_updateStream = GetDataFeed().Publish();
_updateStream.Connect();
}
static void Main(string[] args)
{
_updateStream.Subscribe(Console.WriteLine);
Console.ReadLine();
GetStream().Subscribe(l => Console.WriteLine("Stream: " + l));
Console.ReadLine();
}
public static IObservable<long> GetStream()
{
return Observable.Create<long>(observer =>
{
var bufferedStream = new ReplaySubject<long>();
_updateStream.Subscribe(bufferedStream);
var data = GetSnapshot();
// This returns the ticks from GetSnapshot
// followed by the buffered ticks from _updateStream
// followed by any subsequent ticks from _updateStream
data.ToObservable().Concat(bufferedStream).Subscribe(observer);
return Disposable.Empty;
});
}
private static IObservable<long> GetDataFeed()
{
var feed = Observable.Interval(TimeSpan.FromSeconds(1));
return Observable.Create<long>(observer =>
{
feed.Subscribe(observer);
return Disposable.Empty;
});
}
流行的观点反对主题,因为它们不是“功能性的”,但如果没有 ReplaySubject,我找不到这样做的方法。热可观察对象上的重播过滤器不起作用,因为它会重播所有内容(可能是一整天的陈旧更新)。
我也担心比赛条件。有没有办法保证某种排序,是否应该在快照之前缓冲较早的更新?与其他 RX 操作员一起,整个事情可以更安全、更优雅地完成吗?
谢谢。
-将要