7

编辑:2013 年 9 月 15 日 - 我正在描述我的场景,进一步细分为帮助每个人更好地了解我的情况的步骤。也添加了整个应用程序的下载源。如果您想跳到原始问题,请向下滚动到最后一个标题。请让我知道这些问题。谢谢

概括

阿拉斯加州首府朱诺有一座 AST(阿拉斯加州骑兵)总部大楼,他们希望在其中显示一个大屏幕,其中显示并自动更新一个数字。该数字称为(犯罪商数指数)或 CQI

CQI基本上是一个计算出来的数字,显示国家当前的犯罪情况......

它是如何计算的?

运行屏幕的程序是一个 .NET WPF 应用程序,它通过 Hot IObservable 流不断接收 CrimeReport 对象。

每个城市计算 CQI,然后对所有城市进行 Sum(),称为州 CQI 以下是计算州 CQI 的步骤

第 1 步 - 接收犯罪数据

每次报告犯罪时,CrimeReport 都会发送到 .NET 应用程序。它具有以下组件

犯罪日期时间

城市- 管辖的市/县

SeverityLevel - 严重/非严重

EstimatedSolveTime - AST 确定解决犯罪所需的估计天数。

所以在这一步中,我们订阅 IObservable 并创建 MainViewModel 的实例

IObservable<CrimeReport> reportSource = mainSource.Publish();
  MainVM = new MainViewModel(reportSource);
reportSource.Connect();

第 2 步 - 按城市分组并按城市计算

当您收到报告时,将其按城市分组,以便

var cities = reportSource.GroupBy(k => k.City)
                  .Select(g => new CityDto(g.Key, g);

CityDto 是一个 DTO 类,它获取当前城市的所有报告并计算城市的 CQI。

城市 CQI 的计算由以下公式完成

如果严重犯罪总数与非严重犯罪总数的比率小于 1

然后

城市的 CQI = 比率 x 估计求解时间的最小值

别的

城市的 CQI = 比率 x 最大估计求解时间

这是 CityDto 的类定义

internal class CityDto 
{
 public string CityName { get; set; }
 public IObservable<decimal> CityCqi {get; set;}

 public CityDto(string cityName, IObservable<CrimeReport> cityReports)
 {
   CityName = cityName;
   // Get all serious and non serious crimes
   //
     var totalSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.Serious)
     .Scan(0, (p, _) => p++);
     var totalnonSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.NonSerious)
     .Scan(0, (p, _) => p++);

     // Get the ratio
     //
     var ratio = Observable.CombineLatest(totalSeriousCrimes, totalnonSeriousCrimes, 
                   (s, n) => n ==  0? s : s/n); // Avoding DivideByZero here

     // Get the minimum and maximum estimated solve time
     //
       var minEstimatedSolveTime = cityReports.Select(c => c.EstimatedSolveTime) 
                     .Scan(5000, (p, n) => n < p? n : p);
       var maxEstimatedSolveTime = cityReports.Select(c=>c.EstimatedSolveTime)
                     .Scan(0, (p, n) => n > p? n : p);

    //Time for the City's CQI
    // 
      CityCqi = Observable.CombineLatest(ratio, minEstimatedSolveTime, maxEstimatedSolveTime, (r, n, x) => r < 1.0? r * n : r * m);
 }
}

既然我们有 City DTO 对象来维护 City 的 CQI 值并通过 IObservable 公开该实时 CQI,阿拉斯加州的首都想要 Sum() 将所有 Cities 的 CQI 显示为阿拉斯加的 CQI 并将其实时显示在屏幕上和每个在参与 CQI 计划的市/县任何地方报告的犯罪应立即对该州的 CQI 产生影响

第 3 步 - 汇总各州的城市数据

现在我们要计算在大屏幕上实时更新的整个 state 的 CQI,我们有 State 的 view model,叫做 MainViewModel

internal class MainViewModel
{
    public MainViewModel(IObservable<CrimeReport> mainReport)
    {
         /// Here is the snippet also mentioned in Step 2
         //
           var cities = mainReport.GroupBy(k => k.City)
                  .Select(g => new CityDto(g.Key, g));

         ///// T h i s ///// Is //// Where //// I /// am /// Stuck
         //
          var allCqis = cities.Select(c => c.CityCqi); // gives you IObservable<IObservable<decimal>> ,

         /// Need to use latest of each observable in allCqi and sum them up
           //// How do I do it ?
     }
}

约束

  • 目前并非阿拉斯加的所有城市都参加了州的 CQI 计划,但城市每天都在招生,所以我无法列出所有城市,无论是否招生,都添加所有城市也不切实际。因此,IObservable 只维护那些不仅参与而且至少发送了一个 CrimeReport 对象的城市。

完整的源代码

源码可以点击这里下载

最初问的问题

我有多个热可观察的单个热可观察...

IObservable<IObservable<decimal>>

我想要一个可观察的,当订阅它时,它的观察者会随时了解内部所有可观察的所有“最新”十进制数字的总和。

我怎样才能做到这一点?我尝试了 CombineLatest(...) 但无法正确处理。

谢谢

4

2 回答 2

7

Rxx库有一个重载,CombineLatest()它需要一个IObservable<IObservable<T>>. 如果您使用此重载,则解决方案很简单:

var runningSum = allCqis
    .Select(cqi => cqi.StartWith(0)) // start each inner sequence off with 0
    .CombineLatest() // produces an IObservable<IList<decimal>>
    .Select(cqis => cqis.Sum()); // LINQ operator Sum(IEnumerable<decimal>)

查看源代码Rxx.CombineLatest可能有助于了解如何“在幕后”解决问题

于 2013-09-16T18:05:11.613 回答
0

很多问题!也许是时候复习一下你的 Rx Skillz 了?我的网站IntroToRx.com几乎涵盖了您最近提出的所有问题。对 Rx 有深入的了解,将使您能够比在论坛上提问更快地回答这些相当简单的问题。您应该能够在不到 3 天的时间内阅读这本书。

反正....

1 你想要一个连续的总和还是最后一个单一的总和?

2 然后,您想要所有流的所有值的总和,还是每个流的总和?

要获取序列的单个总和值,请使用.Sum()运算符。http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#MaxAndMin

要获得运行总计,请使用Scan运算符。http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#Scan

所以答案可能是这样的(未经测试):

sources.Select(source=>source.Scan(0m, (acc, value)=>acc+=value)).Merge();
于 2013-09-14T20:09:37.937 回答