我有一个应用程序可以跟踪各州的牛奶价格波动,用户在当地杂货店看到牛奶价格时可以访问网站并提交该价格(很像 GasBuddy)。
当他们提交价格时,我会在我的 Observable<MilkPrice> 中收到它,这是牛奶价格的实现
class MilkPrice {
string State {get;set;}
decimal Price {get;set;}
}
当我收到价格时,我维护每个状态的 MilkPriceTracker 对象列表,如果特定状态的对象不存在,我将其添加到我的 List 对象中,MilkPriceTracker 是一个视图模型,它在构造函数中采用 IObservable,这是我如何构造 Tracker 对象
if (!_statesTracker.Any(s => s.State.Equals(receivedInOnNext.State)) {
_statesTracker.Add(new MilkPriceTracker (mainObservable.Where(s.State.Equals(receivedInOnNext.State));
}
默认情况下,我不会将所有 50 个州都添加到我的列表中,因为我只喜欢报告任何价格的州,现在这是有趣的部分,比如来自弗吉尼亚州的 1000 个用户同时开始报告价格...... .
当第一个人报告时,他将构造新的 Tracker 对象并且他的消息将被错过,因为它已经被消费了,我可以尝试传递 Observable.Concat(Observable.Return(alreadyReceivedObj), mainObservable.Wher .....))给构造函数
我试过了,但是由于消息的频率很高,到 mainObservable 的订阅时仍然错过了一些。处理 OnNext 并构造了新的 Tracker。
我如何“暂停” mainObservable 并告诉它“DVR”它或“缓冲”它直到我恢复?所以我不会错过任何消息。
如果我不够清楚,请随时告诉我
谢谢。