编辑:2013 年 9 月 15 日 - 我正在描述我的场景,进一步细分为帮助每个人更好地了解我的情况的步骤。也添加了整个应用程序的下载源。如果您想跳到原始问题,请向下滚动到最后一个标题。请让我知道这些问题。谢谢
概括
阿拉斯加州首府朱诺有一座 AST(阿拉斯加州骑兵)总部大楼,他们希望在其中显示一个大屏幕,其中显示并自动更新一个数字。该数字称为(犯罪商数指数)或 CQI
CQI基本上是一个计算出来的数字,显示国家当前的犯罪情况......
它是如何计算的?
运行屏幕的程序是一个 .NET WPF 应用程序,它通过 Hot IObservable 流不断接收 CrimeReport 对象。
每个城市计算 CQI,然后对所有城市进行 Sum(),称为州 CQI 以下是计算州 CQI 的步骤
第 1 步 - 接收犯罪数据
每次报告犯罪时,CrimeReport 都会发送到 .NET 应用程序。它具有以下组件
犯罪日期时间
城市- 管辖的市/县
SeverityLevel - 严重/非严重
EstimatedSolveTime - AST 确定解决犯罪所需的估计天数。
所以在这一步中,我们订阅 IObservable 并创建 MainViewModel 的实例
IObservable<CrimeReport> reportSource = mainSource.Publish();
MainVM = new MainViewModel(reportSource);
reportSource.Connect();
第 2 步 - 按城市分组并按城市计算
当您收到报告时,将其按城市分组,以便
var cities = reportSource.GroupBy(k => k.City)
.Select(g => new CityDto(g.Key, g);
CityDto 是一个 DTO 类,它获取当前城市的所有报告并计算城市的 CQI。
城市 CQI 的计算由以下公式完成
如果严重犯罪总数与非严重犯罪总数的比率小于 1
然后
城市的 CQI = 比率 x 估计求解时间的最小值
别的
城市的 CQI = 比率 x 最大估计求解时间
这是 CityDto 的类定义
internal class CityDto
{
public string CityName { get; set; }
public IObservable<decimal> CityCqi {get; set;}
public CityDto(string cityName, IObservable<CrimeReport> cityReports)
{
CityName = cityName;
// Get all serious and non serious crimes
//
var totalSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.Serious)
.Scan(0, (p, _) => p++);
var totalnonSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.NonSerious)
.Scan(0, (p, _) => p++);
// Get the ratio
//
var ratio = Observable.CombineLatest(totalSeriousCrimes, totalnonSeriousCrimes,
(s, n) => n == 0? s : s/n); // Avoding DivideByZero here
// Get the minimum and maximum estimated solve time
//
var minEstimatedSolveTime = cityReports.Select(c => c.EstimatedSolveTime)
.Scan(5000, (p, n) => n < p? n : p);
var maxEstimatedSolveTime = cityReports.Select(c=>c.EstimatedSolveTime)
.Scan(0, (p, n) => n > p? n : p);
//Time for the City's CQI
//
CityCqi = Observable.CombineLatest(ratio, minEstimatedSolveTime, maxEstimatedSolveTime, (r, n, x) => r < 1.0? r * n : r * m);
}
}
既然我们有 City DTO 对象来维护 City 的 CQI 值并通过 IObservable 公开该实时 CQI,阿拉斯加州的首都想要 Sum() 将所有 Cities 的 CQI 显示为阿拉斯加的 CQI 并将其实时显示在屏幕上和每个在参与 CQI 计划的市/县任何地方报告的犯罪应立即对该州的 CQI 产生影响
第 3 步 - 汇总各州的城市数据
现在我们要计算在大屏幕上实时更新的整个 state 的 CQI,我们有 State 的 view model,叫做 MainViewModel
internal class MainViewModel
{
public MainViewModel(IObservable<CrimeReport> mainReport)
{
/// Here is the snippet also mentioned in Step 2
//
var cities = mainReport.GroupBy(k => k.City)
.Select(g => new CityDto(g.Key, g));
///// T h i s ///// Is //// Where //// I /// am /// Stuck
//
var allCqis = cities.Select(c => c.CityCqi); // gives you IObservable<IObservable<decimal>> ,
/// Need to use latest of each observable in allCqi and sum them up
//// How do I do it ?
}
}
约束
- 目前并非阿拉斯加的所有城市都参加了州的 CQI 计划,但城市每天都在招生,所以我无法列出所有城市,无论是否招生,都添加所有城市也不切实际。因此,IObservable 只维护那些不仅参与而且至少发送了一个 CrimeReport 对象的城市。
完整的源代码
源码可以点击这里下载
最初问的问题
我有多个热可观察的单个热可观察...
IObservable<IObservable<decimal>>
我想要一个可观察的,当订阅它时,它的观察者会随时了解内部所有可观察的所有“最新”十进制数字的总和。
我怎样才能做到这一点?我尝试了 CombineLatest(...) 但无法正确处理。
谢谢