我们的代码中已经有并行扇出工作(使用ParallelEnumerable
),该代码当前运行在 12 核、64G RAM 服务器上。但是我们希望将代码转换为使用 Rx,以便我们可以在下游管道上有更好的灵活性。
当前工作流程:
我们从数据库中读取数百万条记录(以流式方式)。
然后在客户端,我们使用自定义
OrderablePartitioner<T>
类将数据库记录分组。让我们调用这个类的一个实例:partioner
.然后我们使用
partioner.AsParallel().WithDegreeOfParallelism(5).ForAll(group => ProcessGroupOfRecordsAsync(group));
注意:这可以读作“并行处理所有组,一次 5 个”。(即并行扇出)。ProcessGroupOfRecordsAsync()
– 循环遍历组中的所有记录,并将它们变成数百甚至数千个 POCO 对象以供进一步处理(即串行扇出或更好的是,扩展)。
根据客户需求:这种新的 POCO 对象串行流通过手动过程进行评估、排序、排序、转换、过滤、过滤,并且可能在管道的其余部分中进行更多的并行和/或串行扇出。
管道的末端可能最终将新记录存储到数据库中,以表格的形式显示 POCO 对象或以各种图表的形式显示。
该过程目前运行良好,只是第 5 点和第 6 点没有我们想要的那么灵活。我们需要能够换入和换出各种下游工作流程。所以,我们的第一次尝试是使用Func<Tin, Tout>
这样的:
partioner.AsParallel
.WithDegreeOfParallelism(5)
.ForAll(group =>ProcessGroupOfRecordsAsync(group, singleRecord =>
NextTaskInWorkFlow(singleRecord));
这没问题,但是我们越是清除我们的需求,我们就越意识到我们只是在重新实现 Rx。
因此,我们希望在 Rx 中执行以下操作:
IObservable<recordGroup> rg = dbContext.QueryRecords(inputArgs)
.AsParallel().WithDegreeOfParallelism(5)
.ProcessGroupOfRecordsInParallel();
If (client1)
rg.AnalizeRecordsForClient1().ShowResults();
if (client2)
rg.AnalizeRecordsForClient2()
.AsParallel()
.WithDegreeOfParallelism(3)
.MoreProcessingInParallel()
.DisplayGraph()
.GetUserFeedBack()
.Where(data => data.SaveToDatabase)
.Select(data => data.NewRecords)
.SaveToDatabase(Table2);
...
using(rg.Subscribe(groupId =>LogToScreen(“Group {0} finished.”, groupId);