0

我有一个应用程序可以跟踪各州的牛奶价格波动,用户在当地杂货店看到牛奶价格时可以访问网站并提交该价格(很像 GasBuddy)。

当他们提交价格时,我会在我的 Observable<MilkPrice> 中收到它,这是牛奶价格的实现

class MilkPrice {
    string State {get;set;}
    decimal Price {get;set;}
 }

当我收到价格时,我维护每个状态的 MilkPriceTracker 对象列表,如果特定状态的对象不存在,我将其添加到我的 List 对象中,MilkPriceTracker 是一个视图模型,它在构造函数中采用 IObservable,这是我如何构造 Tracker 对象

if (!_statesTracker.Any(s => s.State.Equals(receivedInOnNext.State)) { 
      _statesTracker.Add(new MilkPriceTracker (mainObservable.Where(s.State.Equals(receivedInOnNext.State));
     }

默认情况下,我不会将所有 50 个州都添加到我的列表中,因为我只喜欢报告任何价格的州,现在这是有趣的部分,比如来自弗吉尼亚州的 1000 个用户同时开始报告价格...... .

当第一个人报告时,他将构造新的 Tracker 对象并且他的消息将被错过,因为它已经被消费了,我可以尝试传递 Observable.Concat(Observable.Return(alreadyReceivedObj), mainObservable.Wher .....))给构造函数

我试过了,但是由于消息的频率很高,到 mainObservable 的订阅时仍然错过了一些。处理 OnNext 并构造了新的 Tracker。

我如何“暂停” mainObservable 并告诉它“DVR”它或“缓冲”它直到我恢复?所以我不会错过任何消息。

如果我不够清楚,请随时告诉我

谢谢。

4

2 回答 2

2

这听起来并不奇怪,更像是对反应式编程缺乏了解。;-)

尝试使用GroupBy.

于 2013-09-13T09:02:20.810 回答
0

我感觉像跳舞。感谢世界上最好的人 Lee Campbell

我编写了以下代码来解决我的问题。

mainObservable
            .GroupBy(k => k.State)
            .Select(g => new MilkPriceTracker(g.Key, g))
            .ObserveOnDispatcher(DispatcherPriority.DataBind)
            .Subscribe(_statesTrackers.Add);
于 2013-09-13T18:05:03.940 回答