问题标签 [tpl-dataflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1169 浏览

c# - BroadcastBlock 未按预期工作

我使用BroadcastBlock其中TPL Dataflow链接到ActionBlocks. 但是,我在我的代码中找不到错误。我将项目发布到BroadcastBlockvia SendAsync,当我在其中打印项目时,ActionBlocks我发现有些项目丢失了,而神秘的是其他项目是重复的。我跟踪处理项目的顺序,因此发现只是在一个项目丢失之前或后续项目的情况下被复制(通过 馈送到相同的项目ActionblockLinkTo。我不知道发生了什么。以下是我的一些相关代码片段:

这里的实例化BroadCastBlock

这是我链接到操作块的方式:

这就是我将项目(报价)提交给BroadCastBlock

问题是,当我在发送之前打印出跟踪信息时,SendAsync一切看起来都很好。ActionBlock当我在该链接中打印相同的信息时,BroadCastBlock我观察到不常见的错误。我注意到当物品被BroadCastBlock快速送入(通过EventHandlerAPI)时,会发生丢失/重复的物品

知道我做错了什么吗?

0 投票
1 回答
4233 浏览

entity-framework - 实体框架和并行

背景

我有一个应用程序接收定期数据转储(XML 文件)并使用 Entity Framework 5(代码优先)将它们导入现有数据库。导入是通过 EF5 而不是 BULK INSERT 或 BCP 进行的,因为必须应用实体中已经存在的业务规则。

处理似乎在应用程序本身中受 CPU 限制(极快的、启用写入缓存的磁盘 IO 子系统在整个过程中显示几乎为零的磁盘等待时间,而 SQL Server 显示不超过 8%-10% 的 CPU 时间)。

为了提高效率,我使用带有组件的TPL Dataflow 构建了一个管道,用于:

我看到这样做可以显着提高性能,但不能使 CPU 超过 60%。

分析

怀疑某种资源争用,我使用 VS2012 Profiler 的资源争用数据(并发)模式运行该过程。

分析器向我显示标记为Handle 2的资源的争用率为 52% 。深入研究,我看到为Handle 2创建最多争用的方法是

第二位,大约是 SaveChanges() 的 40%,是

问题

  • 我怎样才能弄清楚Handle 2到底是什么(例如 TPL 的一部分,EF 的一部分)?
  • EF 是否会限制从单独线程中分离 DbContext 实例的调用?他们似乎在争夺一个共享资源。
  • 在这种情况下,我可以做些什么来提高并行性?

更新

对于有问题的运行,调用 SaveChanges 的任务的最大并行度设置为 12(我在以前的运行中尝试了各种值,包括 Unbounded)。

更新 2

Microsoft 的 EF 团队提供了反馈。请参阅我的答案以获取摘要。

0 投票
1 回答
617 浏览

.net - 如何获得 IPropagatorBlock喂自己停下来?

让我们假设我从一个TransformBlock<Uri, string>(它本身是一个实现IPropagatorBlock<Uri, string>)开始,它接受Uri然后在 a 中获取内容string(这是一个网络爬虫):

一旦我将内容包含在字符串中,我就会解析它以获取链接。由于一个页面可以有多个链接,我使用 aTransformManyBlock<string, Uri>将单个结果(内容)映射到多个链接:

解析器的关键是它可以传回一个空序列,表明没有更多项目需要解析。

但是,这仅适用于树的一个分支(或网络的一部分)。

然后我将下载器链接到解析器,然后返回到下载器,如下所示:

现在,我知道我可以让所有东西都停止在块之外(通过调用Complete其中一个),但是我怎样才能从块发出信号表明它已经完成?

还是我必须自己以某种方式管理这种状态?

现在,它只是挂起,因为在下载并解析了所有内容之后,下载程序块被饿死了。

这是一个完全包含的测试方法,它在调用时挂起Wait

其输出(如预期的那样,在挂起之前)是:

0 投票
3 回答
1688 浏览

c# - 长时间运行进程的并行化和性能优化

我想并行化逐帧处理多个视频剪辑的应用程序。每个剪辑的每一帧的序列很重要(显然)。我决定使用 TPL 数据流,因为我相信这是数据流的一个很好的例子(电影帧是数据)。

所以我有一个从数据库加载帧的进程(可以说是一批 500 个,全部聚集在一起)

并将它们发布到 BufferBlock。对于这个 BufferBlock,我将 ActionBlocks 与过滤器链接起来,让每个 MovieID 有一个 ActionBlock,这样我就可以获得某种数据分区。每个动作块都是顺序的,但理想情况下,多部电影的多个动作块可以并行运行。

我确实有上述网络工作并且它确实并行运行,但根据我的计算,只有八到十个动作块同时执行。我计时了每个 ActionBlock 的运行时间,大约为 100-200 毫秒。我可以采取哪些步骤来至少双倍并发?

我确实尝试将操作委托转换为异步方法并在 ActionBlock 操作委托中使数据库访问异步,但它没有帮助。

编辑:我实现了额外级别的数据分区:具有奇数 ID 的电影的帧在 ServerA 上处理,偶数电影的帧在 ServerB 上处理。应用程序的两个实例都访问了同一个数据库。如果我的问题是 DB IO,那么我不会看到处理的总帧数有任何改善(或者很少,低于 20%)。但我确实看到它翻了一番。所以这让我得出结论,Threadpool 没有产生更多线程来并行处理更多帧(两台服务器都是四核的,分析器显示每个应用程序大约有 25-30 个线程)。

0 投票
2 回答
1967 浏览

c# - 如何并行处理项目然后合并结果?

我面临以下问题:

我有一个对象数据流,Foo并将这些对象流式传输到几个并发的进程内任务/线程,这些任务/线程依次处理对象和输出FooResult对象。每个都FooResult包含在其他成员Foo中与创建FooResult. 但是,并非每个人都Foo必须创建一个FooResult.

我的问题是我想从整个过程中传递一个包装对象,该对象包含原始对象和可能从并发任务中创建的Foo所有对象(如果有的话) 。FooResultFoo

注意:我目前使用 TPL 数据流,而每个并发进程都发生在ActionBlock<Foo>BroadCastBlock<Foo>. 它用于SendAsync()向目标数据流块发送可能创建FooResult的 . 显然,并发数据流块FooResult在不可预测的时间产生,这是我目前正在努力解决的问题。我似乎无法弄清楚FooResult总共创建了多少个ActionBlock<Foo>,以便我可以将它们与原始对象捆绑在一起Foo并将其作为包装对象传递。

在伪代码中,它目前看起来如下:

Foo但是,当前代码的问题在于,如果 a没有FooResult在任何操作块中生成单个,则 targetBlock 可能不会收到任何内容。此外,可能 targetBlock 接收 2 个FooResult对象,因为每个动作块产生一个FooResult.

我想要的是 targetBlock 接收一个包含每个对象的包装对象,Foo如果FooResult创建了对象,那么也是一个FooResult.

有什么想法可以使解决方案按照描述的方式工作吗?它不必仔细阅读 TPL 数据流,但如果这样做会很整洁。

更新:以下是我通过 svick 建议的 JoinBlock 实现得到的。我不会使用它(除非它可以在性能方面进行调整),因为它运行起来非常慢,我每秒可以处理大约 89000 个项目(而且那只是 int 值类型)。

更新代码以反映建议:

0 投票
5 回答
17037 浏览

c# - TPL Dataflow,仅在所有源数据块完成时才保证完成

当两个变换块都完成时,如何重写代码完成的代码?我认为完成意味着它被标记为完成并且“out queue”是空的?

我编辑了代码,为每个转换块添加了输入缓冲区计数。很明显,所有 100 个项目都流式传输到每个变换块。但是一旦一个变换块完成,处理器块就不再接受任何项目,而是不完整的变换块的输入缓冲区只是刷新输入缓冲区。

0 投票
1 回答
4332 浏览

task-parallel-library - Tasks vs. TPL Dataflow vs. Async/Await,什么时候使用?

我已经阅读了一些 Microsoft 团队或其他作者的大量技术文档,详细介绍了新的 TPL 数据流库、异步/等待并发框架和 TPL 的功能。但是,我还没有真正遇到任何明确描述何时使用的内容。我知道每个都有自己的位置和适用性,但我特别想知道以下情况:

我有一个完全在进程中运行的数据流模型。顶部是一个数据生成组件 (A),它生成数据并通过数据流块链接或通过引发事件将其传递给处理组件 (B)。(B) 中的某些部分必须同步运行,而 (A) 大量受益于并行性,因为大多数进程都受 I/O 或 CPU 限制(从磁盘读取二进制数据,然后对其进行反序列化和排序)。最后,处理组件 (B) 将转换后的结果传递给 (C) 以供进一步使用。

我特别想知道何时在以下方面使用任务、异步/等待和 TPL 数据流块:

  • 启动数据生成组件 (A)。我显然不想锁定 gui/仪表板,因此这个过程必须在不同的线程/任务上运行。

  • 如何调用(A)、(B)、(C)中不直接参与数据生成和处理过程但执行可能需要数百毫秒/秒才能返回的配置工作的方法。我的直觉是,这就是 async/await 的亮点?

  • 我最挣扎的是如何最好地设计从一个组件传递到下一个组件的消息。TPL 数据流看起来很有趣,但有时对我的目的来说太慢了。(注意最后关于性能问题)。如果不使用 TPL Dataflow,我如何通过进程间任务/并发数据传递来实现响应性和并发性?例如,如果我在任务中引发事件,订阅的事件处理程序会在同一个任务中运行,而不是传递给另一个任务,对吗?综上所述,组件(A)在将数据传递给组件(B)而组件(B)检索数据并专注于处理数据后如何开展业务?这里最好使用哪种并发模型?我在这里实现了数据流块,但这真的是最好的方法吗?

  • 我想以上几点总结表明我在如何使用标准实践设计和实现 API 类型组件方面遇到了困难?方法是否应该设计为异步的,数据输入作为数据流块,数据输出作为数据流块或事件?一般来说,最好的方法是什么?我之所以问,是因为上面提到的大多数组件都应该独立工作,因此它们基本上可以在内部被换出或独立更改,而无需重新编写访问器和输出。

性能注意事项:我提到 TPL 数据流块有时很慢。我处理高吞吐量、低延迟类型的应用程序和目标磁盘 I/O 限制,因此 tpl 数据流块的执行速度通常比同步处理单元慢得多。问题是我不知道如何将流程嵌入到它自己的任务或并发模型中以实现与 tpl 数据流块已经处理的类似的东西,但没有 tpl df 带来的开销。

0 投票
2 回答
6779 浏览

c# - TPL Dataflow,如何将项目转发到多个链接目标块中的一个特定目标块?

我正在寻找一种 TPL 数据流块解决方案,它可以容纳多个项目,它可以链接到多个目标块,但它能够将项目仅转发到通过过滤器/谓词的特定目标块。任何时候都不应将一个项目同时交付给多个目标块,始终只交付给与过滤器匹配的那个,否则该项目可以被丢弃。我不喜欢 BroadCastBlock,因为如果我理解正确,它不能保证交付(或者是吗?)并且过滤是在目标块端完成的,这意味着 BroadCastBlock 基本上将每个项目的副本发送到所有链接到目标块。如果我理解正确的话,它在任何时候都不会超过一件。我不想使用 Post/Async 但维护一个 LinkTo 链。

有没有办法绕过完整的自定义数据流块?还是我误解了 BroadCastBlock 的工作原理?不幸的是,没有太多的文档可以详细介绍并涵盖用例。任何想法都受到高度赞赏。

0 投票
2 回答
12164 浏览

c# - TPL Dataflow,Post() 和 SendAsync() 之间的功能区别是什么?

我对通过 Post() 或 SendAsync() 发送项目之间的区别感到困惑。我的理解是,在所有情况下,一旦项目到达数据块的输入缓冲区,控制权就会返回到调用上下文,对吗?那我为什么需要 SendAsync 呢?如果我的假设不正确,那么我想知道,相反,如果使用数据块的整个想法是建立一个并发和异步环境,为什么有人会使用 Post()。

我当然理解技术上的区别,即 Post() 返回一个 bool 而 SendAsync 返回一个可等待的 bool 任务。但这有什么影响?什么时候会延迟返回布尔值(我的理解是确认该项目是否已放入数据块的队列中)?我理解 async/await 并发框架的一般概念,但在这里它没有多大意义,因为除了 bool 之外,对传入项目所做的任何事情的结果都不会返回给调用者,而是放在一个“out-queue”并转发到链接的数据块或丢弃。

发送物品时这两种方法之间有什么性能差异吗?

0 投票
3 回答
1638 浏览

c# - TPL Dataflow,我可以查询数据块是否标记为完成但尚未完成?

鉴于以下情况:

targetBlock似乎永远不会完成,我认为原因是其中的所有项目都TransformBlock targetBlock在输出队列中等待,因为我没有将其链接targetBlock到任何其他 Dataflow 块。但是,我真正想要实现的是当 (A)targetBlock被通知完成并且 (B) 输入队列为空时的通知。我不想关心项目是否仍然位于TransformBlock. 我该怎么办?是获取我想要查询sourceBlockAND 的完成状态以确保 的InputCount为零targetBlock的唯一方法吗?我不确定这是否非常稳定(sourceBlock如果最后一项sourceBlock已传递给targetBlock?)。有没有更优雅、更有效的方式来实现相同的目标?

编辑:我刚刚注意到,即使是“肮脏”的方式来检查sourceBlock与为零InputCount的完成情况targetBlock也并非易事。那个街区会坐在哪里?它不能在其中,targetBlock因为一旦满足上述两个条件,显然就targetBlock不再处理任何消息。还检查完成状态sourceBlock引入了很多低效率。