4

我们的代码中已经有并行扇出工作(使用ParallelEnumerable),该代码当前运行在 12 核、64G RAM 服务器上。但是我们希望将代码转换为使用 Rx,以便我们可以在下游管道上有更好的灵活性。

当前工作流程:

  1. 我们从数据库中读取数百万条记录(以流式方式)。

  2. 然后在客户端,我们使用自定义OrderablePartitioner<T>类将数据库记录分组。让我们调用这个类的一个实例:partioner.

  3. 然后我们使用partioner.AsParallel().WithDegreeOfParallelism(5).ForAll(group => ProcessGroupOfRecordsAsync(group));

    注意:这可以读作“并行处理所有组,一次 5 个”。(即并行扇出)。

  4. ProcessGroupOfRecordsAsync()– 循环遍历组中的所有记录,并将它们变成数百甚至数千个 POCO 对象以供进一步处理(即串行扇出或更好的是,扩展)。

    根据客户需求:

  5. 这种新的 POCO 对象串行流通过手动过程进行评估、排序、排序、转换、过滤、过滤,并且可能在管道的其余部分中进行更多的并行和/或串行扇出。

  6. 管道的末端可能最终将新记录存储到数据库中,以表格的形式显示 POCO 对象或以各种图表的形式显示。

该过程目前运行良好,只是第 5 点和第 6 点没有我们想要的那么灵活。我们需要能够换入和换出各种下游工作流程。所以,我们的第一次尝试是使用Func<Tin, Tout>这样的:

partioner.AsParallel
         .WithDegreeOfParallelism(5)
         .ForAll(group =>ProcessGroupOfRecordsAsync(group, singleRecord =>
             NextTaskInWorkFlow(singleRecord));

这没问题,但是我们越是清除我们的需求,我们就越意识到我们只是在重新实现 Rx。

因此,我们希望在 Rx 中执行以下操作:

IObservable<recordGroup> rg = dbContext.QueryRecords(inputArgs)
    .AsParallel().WithDegreeOfParallelism(5)
    .ProcessGroupOfRecordsInParallel();

If (client1)
    rg.AnalizeRecordsForClient1().ShowResults();

if (client2)
    rg.AnalizeRecordsForClient2()
      .AsParallel()
      .WithDegreeOfParallelism(3)
      .MoreProcessingInParallel()
      .DisplayGraph()
      .GetUserFeedBack()
      .Where(data => data.SaveToDatabase)
      .Select(data => data.NewRecords)
      .SaveToDatabase(Table2);
...
using(rg.Subscribe(groupId =>LogToScreen(“Group {0} finished.”, groupId);
4

2 回答 2

3

听起来您可能想研究任务并行库中的数据流- 这可能比 Rx 更适合处理第 5 部分,并且可以扩展以处理整个问题。

一般来说,我不喜欢尝试使用 Rx 来并行化 CPU 绑定任务的想法。它通常不合适。如果不是太小心,可能会在不经意间引入低效率。数据流可以为您提供仅在最有意义的地方进行并行化的好方法。

来自 MSDN:

任务并行库 (TPL) 提供数据流组件来帮助提高支持并发的应用程序的稳健性。这些数据流组件统称为 TPL 数据流库。该数据流模型通过为粗粒度数据流和流水线任务提供进程内消息传递来促进基于参与者的编程。数据流组件建立在 TPL 的类型和调度基础架构之上,并与 C#、Visual Basic 和 F# 语言集成以支持异步编程。当您有多个必须以异步方式相互通信的操作或当您希望在数据可用时对其进行处理时,这些数据流组件非常有用。例如,考虑一个处理来自网络摄像头的图像数据的应用程序。通过使用数据流模型,应用程序可以在图像帧可用时对其进行处理。如果应用程序增强了图像帧,例如,通过执行光线校正或红眼消除,您可以创建数据流组件的管道。管道的每个阶段都可能使用更粗粒度的并行功能(例如 TPL 提供的功能)来转换图像。

于 2013-11-01T10:00:16.287 回答
1

卡布!

由于没有人提供任何明确的信息,我会指出源代码可以在GitHub 的 Rx 上浏览。快速浏览一下,看起来至少有一些处理(全部?)已经在线程池上完成了。因此,除了实现自己的调度程序(例如 Rx TestScheduler )之外,也许不可能显式控制并行化程度,但它仍然会发生。另请参阅下面的链接,从答案(尤其是James在第一个链接中提供的答案)来看,可观察任务按设计顺序排列和处理——但可以为 Rx 提供多个流来处理。

另请参阅左侧相关且可见的其他问题(默认情况下)。特别是它看起来像这样一个,Reactive Extensions: Concurrency within the subscriber,可以为您的问题提供一些答案。或者也许使用 Reactive 并行运行方法

<edit:请注意,如果将对象存储到数据库成为问题,则 Rx 流可以将保存操作推送到ConcurrentQueue,然后将单独处理。其他选择是让 Rx 将具有一些时间和项目数量的适当组合的项目排队,并通过批量插入将它们推送到数据库。

于 2013-11-01T07:19:58.520 回答