问题标签 [tpl-dataflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
11271 浏览

.net - VS 2012 RC 中引用 TPL 数据流和 TPL 的问题

我刚刚将 Visual Studio 11 Beta 升级到新的 Visual Studio 2012 RC,并且在引用 TPL 数据流时遇到了问题。

首先,我尝试像以前一样通过从框架中添加引用来引用 Dataflow。但是当我尝试这样做时,我得到一个错误框:

无法添加对“System.Threading.Tasks.Dataflow”的引用。

然后整个 Visual Studio 冻结。

在阅读了 MEF 和 TPL Dataflow NuGet Packages for .NET Framework 4.5 RC之后,我认为参考列表中显示的 Dataflow 版本是以前安装的某种工件。所以,我尝试使用来自 NuGet 的 Dataflow,这似乎可以工作,直到我真正尝试编译我的代码,因为我收到了一个错误:

“System.Threading.Tasks.Task”类型是在未引用的程序集中定义的。您必须添加对程序集“System.Threading.Tasks, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a”的引用。

这很令人困惑,因为Task在 mscorlib 中,不需要其他引用。但是在引用列表中有一个引用程序集System.Threading.Tasks,所以我尝试添加它。不幸的是,出现了一个熟悉的错误:

无法添加对“System.Threading.Tasks”的引用。

然后 Visual Studio 再次冻结。

难道我做错了什么?如何在 VS 2012 RC 中使用 TPL 数据流?

0 投票
3 回答
721 浏览

c# - c#中的线程构建分布式DFS

我一直在尝试在 C# 中实现分布式深度优先搜索。我已经成功到了一定程度,但出现了同步错误。我无法纠正错误。我想要做的是让每个节点使用任务并行数据流相互通信,从而在 DFS 中实现并行性。下面是我的代码:

在此处输入图像描述

0 投票
1 回答
2742 浏览

c# - 使用异步操作的 TPL 数据流

我正在通过移植一些旧的套接字代码以使用 TPL 数据流和新的异步功能来试验 TPL 数据流。尽管 API 感觉坚如磐石,但我的代码最终还是感觉很乱。我想知道我是否在这里遗漏了什么。

我的要求如下: 一个套接字类公开:Open、Close、Send 和 Receive 方法。所有都返回一个任务,因此是异步的。Open 和 Close 是原子的。发送和接收可以彼此相邻工作,尽管它们一次只能处理 1 个命令。

从逻辑上讲,这让我想到了下一段内部控制代码:

到目前为止一切都很好。我可以安全地将操作发送到发送和接收块,而同时不必担心正在运行的连接相关操作。ActionBlock 还确保多个发送调用是同步的(接收、关闭和打开同上)。

问题是没有简单的方法让动作将任务传达回海报。现在我正在使用 TaskCompletionSource 来传达结果。喜欢:

只是感觉丑陋和笨拙。我的问题是:有没有办法使用 TPL 同步工作流,而不必使用 TaskCompletionSource 进行通信?

谢谢!

0 投票
2 回答
2690 浏览

c# - 这是 TPL Dataflow 的工作吗?

我在不同的任务上运行了一个非常典型的生产者/消费者模型。

Task1:从二进制文件中读取批量字节[],并为每个字节数组集合启动一个新任务。(出于内存管理目的,该操作是批处理的)。

任务 2-n:这些是工作任务,每个都对传入的字节数组集合(来自 Tasks1)进行操作,并对字节数组进行反序列化,按特定标准对其进行排序,然后存储结果对象的集合(每个字节数组反序列化为此类对象)在并发字典中。

任务 (n+1) 我选择了并发字典,因为此任务的工作是合并存储在并发字典中的那些集合,其顺序与它们源自 Task1 的顺序相同。我通过从 Task1 一直向下传递到此任务的 collectionID(它是 int 类型并为 Task1 中的每个新集合递增)来实现这一点。该任务主要检查下一个预期的 collectionID 是否已经存储在并发字典中,如果是,则将其取出,将其添加到最终队列并检查并发字典中的下一个集合。

现在,从我阅读的内容和观看的视频来看,在我看来,TPL Dataflow 可能是这种生产者/消费者模型的完美候选者。我只是似乎无法设计一个设计并因此开始,因为我从未使用过 TPL Dataflow。就吞吐量和延迟而言,这个库是否能胜任这项任务?我目前在结果集合中每秒处理 250 万字节数组和对象。TPL 数据流可以帮助简化吗?我对以下问题的答案特别感兴趣:TPL Dataflow 在生成工作任务并在工作任务完成后重新合并它们时,是否可以保留来自 Task1 的收集批次的顺序?它会优化事物吗?在剖析了整个结构之后,我觉得由于旋转和涉及太多并发集合而浪费了相当多的时间。

有什么想法、想法吗?

0 投票
4 回答
5560 浏览

c# - TPL 数据流加速?

我想知道是否可以优化以下代码以更快地执行。我目前似乎在一个非常简单的数据流结构上每秒最多可以处理 140 万条简单消息。我知道这个示例过程同步传递/转换消息,但是,我目前测试 TPL Dataflow 作为我自己的基于任务和并发集合的自定义解决方案的可能替代品。我知道术语“并发”已经建议我并行运行,但为了当前的测试目的,我通过同步在自己的解决方案上推送消息,每秒大约有 510 万条消息。我在这里缺少什么,我读到 TPL 数据流被推为高吞吐量、低延迟的解决方案,但到目前为止我必须忽略性能调整。谁能指出我正确的方向?

0 投票
1 回答
2216 浏览

c# - 最优化的 TPL 数据流设计?

我想询问如何使用 TPL Dataflow 最好地设计最佳架构。我还没有编写代码,所以没有可以发布的示例代码。我也不是在寻找代码(除非自愿),但非常感谢设计方面的帮助:

要求如下:

我有 3 个以特定方式相互依赖的核心数据块。Datablock1 是生产 Foo1 类型对象的生产者。Datablock2 应该订阅 Foo1 对象(来自 Datablock1),并且可能(不是在每个 Foo1 上,受特定函数约束)生成 Foo2 对象,并将其存储在输出队列中以供其他数据块使用。Datablock3 还使用 Foo1 对象(来自 Datablock1)并可能生成 Foo3 对象,Datablock2 使用这些对象并转换为 Foo2 对象。

总而言之,以下是数据块以及它们各自产生和消耗的内容:

  • Datablock1:生产(Foo1),消费(无)
  • Datablock2:生产(Foo2),消费(Foo1,Foo3)
  • Datablock3:生产(Foo3),消费(Foo1)

另一个要求是在 Datablock2 和 Datablock3 中几乎同时处理相同的 Foo1。如果 Foo1 对象首先由 Datablock2 使用,然后一旦 Datablock2 完成其工作,同样的 Foo1 对象将被发布到 Datablock3 以供其工作。Datablock2 中的 Foo2 对象可以来自对 Foo1 对象或 Foo3 对象的操作。

我希望这是有道理的,如果仍然不清楚,我很乐意解释更多。

我的第一个想法是为 3 个数据块中的每一个创建 TPL 数据流块,并让它们处理不同对象类型的传入流。另一个想法是拆分数据块,让每个数据块只处理一种对象类型的流。您有什么建议,或者是否有更好的解决方案可能有效?

Svick 已经在 Datablock1 上提供了帮助,它已经投入使用,我只是纠结于如何将我当前的环境(如上所述)转换为 TPL Dataflow。

任何想法或指针都非常感谢。

0 投票
1 回答
3061 浏览

.net - TPL 数据流和 Rx 组合示例

我只想学习两者以及如何一起使用它们。我知道它们可以相互补充,我只是找不到有人实际这样做的例子。

0 投票
1 回答
433 浏览

c# - 如何通过所有数据流块传播消息上限?

我想知道是否有一种方法可以设置数据流块中的队列可以容纳的最大项目/消息数(缓冲区块或动作块队列),并将这种上限向上传播到其他数据块,这些数据块可能会提供包含队列的数据块,它可以容纳,设置的最大项目。我猜提供数据块的队列只会增加项目,对吗?有没有办法让它们也锁定,或者我是否需要为链接到包含有上限队列的块的所有块队列实现上限?

例如,我可以从链接到缓冲区块的自定义生成数据流块开始。这个缓冲区块链接到一个动作块,我希望它的队列不超过队列中的一定数量的项目。如何防止生产数据流块或缓冲区块中的队列在自己的队列中存储越来越多的项目,因为知道链中的最后一个数据流块不能在某个时间点消耗更多项目并且 inQueue 不能接受更多项目?

0 投票
1 回答
409 浏览

c# - TPL 数据流消息类型分配模式可能吗?

我运行一个算法来接收不同类型的进程外消息。传入的消息实际上是字节数组,每个字节数组前面都有一个字节数组标志,指示消息类型。我想了解是否可以设置一个IPropagator<byte[], byte[]>处理传入的字节数组,解释字节数组标志,然后将字节数组流式传输到特定的相应链接 ActionBlock。

例如,假设我有 2 种不同的消息类型,并且我有 2 种不同的相应 ActionBlock,它们应该只接收与它们应该接收的预期消息类型匹配的消息。我相信如果我只是将 IPropagatorBlock 链接到两个 Actionblocks 两个 ActionBlocks 将收到相同的消息?如何根据其标志正确分配每条消息(不要担心标志,标识是微不足道的,假设我随时知道哪个 ActionBlock IPropgatorBlock 想要将消息流式传输)?我正在努力正确设置数据流结构。我希望能够将数据块直接相互链接,而不必 Post()。那可能吗?

非常感谢这方面的任何帮助。

0 投票
1 回答
1350 浏览

c# - TPL 数据流在运行时中断 LinkTo()

我有一个TransformManyBlock<Tin, Tout>并且在运行时添加消费者(ActionBlocks)到LinkTo(...).

  1. TransformManyBlock 是否是正确的数据流块来消费元素,转换它们,然后输出(与输入相同数量的元素输出)给多个消费者(每个链接到的消费者像广播一样消费相同的元素)?我故意不选择 BroadCastBlock,因为它似乎无法像 BufferBlock 一样转换元素。

  2. 我想知道如何在运行时取消链接消费者(此处为 ActionBlocks)?据我所知,LinkTo() 似乎没有提供这样的功能。