问题标签 [tpl-dataflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2834 浏览

c# - TPL 数据流,替代 JoinBlock 限制?

我寻找可以通过 n-TransformBlocks 链接到 JoinBlock 的替代方法,并将所有 TransformBlock 源块的消息连接/合并在一起,以便将此类集合传递到另一个数据流块。

JoinBlock 可以很好地完成这项工作,但仅限于连接最多 3 个源块。它还存在很多效率低下的问题(连接 2 个源块的偶数值类型(整数)非常慢)。有没有办法让从 TransformBlocks 返回的 Tasks 并等到所有 TransformBlocks 都有完成的任务才能在接受之前传递Task<item>

有什么替代的想法吗?在传递加入的项目集合之前,我可能有 1-20 个这样的转换块,我需要将哪些项目连接在一起。每个变换块保证为每个“变换”的输入项准确返回一个输出项。

编辑:要求澄清:

根据我之前的一个问题,我将 JoinBlocks 设置如下:

0 投票
1 回答
432 浏览

c# - 并发框架(TDF),这里需要集合的深层副本吗?

我对传递到广播块的列表集合有疑问。这是我到目前为止所拥有的(伪代码,因为完整的代码库太长了):

我的问题是,在当前的实现中,tempBCB我在 final 中得到了奇怪的结果TransformBlock<TInput, TOutput>

例如,Dictionary<int, IParentrOrder>作为元组一部分的集合大小不相等,即使实现tfb1tfb2100% 相同。

实现中被注释掉的行tempBCB对广播列表做了一个深拷贝,这似乎解决了问题,但问题是这个深拷贝使我的代码慢了大约 10 倍,这是我需要找到的数量级一个不同的解决方案。

首先,我不确定这是问题所在,还是只是速度变慢导致并发操作按预期执行,即使错误仍然隐藏在那里。

其次,如果广播块中缺少深拷贝会导致这些问题,我怎样才能让它更快?

这是我的深拷贝代码:

我可能会将 aQuote[]而不是馈List<Quote>入广播块,但我看不出它如何有助于加快深度复制的性能。

我的问题是:

  • 深拷贝问题是这里的真正问题吗(我怀疑,因为List<Quote>任何转换块都不会改变流入广播块的 , )?
  • 如果是,为什么以及如何使深层副本更有效?
0 投票
1 回答
4053 浏览

c# - 在 ActionBlock 中等待异步 lambda

我有一个带有 ActionBlock 的类 Receiver:

ActionBlock 的 Register-Action 是带有 await-Statement 的异步方法:

现在我想做的是同步等待(如果设置了条件),直到操作方法完成以获得独占行为:

问题是当“等待 Task.Delay(500);” 语句被执行,“receiver.Post(5).Wait();” 不再等待。

我尝试了几种变体(TaskCompletionSource、ContinueWith、...),但它不起作用。

有谁知道如何解决这个问题?

0 投票
0 回答
1549 浏览

c# - 如何在运行时动态更改批块的批大小?

我在 tpl 数据流中有一个批处理块,并且有几个目标块链接到批处理块。但是,目标块的数量会动态变化,因此批次的大小也会随之变化。问题是必须在初始化批处理块时提供批处理大小,我看不到稍后调整它的方法。任何想法如何解决这个问题?是取消链接的唯一方法(处理所有到批处理块和批处理块的链接),用新的批处理大小重新初始化批处理块,然后再次链接?我可以这样做,但是如何确保旧批次和新批次不会混在一起呢?

例如,如果我有 2 个转换块流式传输到批处理块,现在有一个额外的转换块并且想要将批处理大小增加到 3,我如何确保在增加之前处理所有先前的批处理以确保同步行为?关键是所有变换块都获得完全相同的项目,并且这些变换块的输出应该以只有那些匹配相同输入的输出被批处理的方式进行批处理。

这是我想要的示例:

用于转换块的恒定整数流:1,2,3,[批量大小增加的点],4,5,...

让变换块输出他们得到的东西,比如 1 => 1

所以 batchblock 应该像这样输出:[1,1], [2,2], [3,3], [change of batch size], [4,4,4], [5,5,5],.. .

这是我当前的代码:

0 投票
1 回答
2056 浏览

c# - TPL Dataflow,对核心设计感到困惑

我一直在使用 TPL Dataflow,但遇到了一个我无法解决的问题:

我有以下架构:

BroadCastBlock<List<object1>>-> 2 个不同的TransformBlock<List<Object1>, Tuple<int, List<Object1>>>-> 都链接到TransformManyBlock<Tuple<int, List<Object1>>, Object2>

我在链末端的 TransformManyBlock 中改变 lambda 表达式:(a) 对流式元组执行操作的代码,(b) 根本没有代码。

在 TransformBlocks 中,我测量从第一项到达开始到 TransformBlock.Completion 指示块完成时停止的时间(broadCastBlock 链接到 transfrom 块,propagateCompletion 设置为 true)。

我无法调和的是,为什么 (b) 情况下的 transformBlocks 完成速度比 (a) 快 5-6 倍。这完全违背了整个 TDF 设计意图的意图。转换块中的项目被传递到 transfromManyBlock,因此 transformManyBlock 对影响转换块何时完成的项目所做的一切都无关紧要。我看不出transfromManyBlock中发生的任何事情可能与前面的TransformBlocks有关的单一原因。

谁能调和这个奇怪的观察?

这是一些显示差异的代码。运行代码时,请确保更改以下两行:

至:

为了观察前面的 transformBlocks 在运行时的差异。

*编辑:我发布了另一个代码片段,这次使用值类型的集合,我无法重现我在上面的代码中观察到的问题。是否是传递引用类型并同时对它们进行操作(即使在不同的数据流块中)可能会阻塞并导致争用?*

0 投票
1 回答
2823 浏览

c# - 自定义动作块

我想实施一个优先的ActionBlock<T>. 这样我就可以有条件地TInput使用Predicate<T>.
我阅读了 Parallel Extensions Extras SamplesGuide to Implementing Custom TPL Dataflow Blocks
但是仍然不知道我该如何实现这种情况。
- - - - - - - - - - - - - - 编辑 - - - - - - - - - - - ------
有一些任务,其中5个可以同时运行。当用户按下按钮时,一些(取决于谓词函数)任务应该以最高优先级运行。
事实上我写了这段代码

但似乎优先级根本没有生效。
---------------------------------------- 编辑 ------------------
我找到了事实上,当我使用这行代码时:

导致应用程序正确观察Tasks的优先级,但一次只能执行一个任务,同时使用流程中显示的第一个代码块,导致应用程序同时运行5个任务但优先级顺序不合适。

更新:
要 svick 的坦克,我应该MaxMessagesPerTask指定taskSchedulerLow.

0 投票
1 回答
4017 浏览

.net - 使用 TPL 数据流创建消息总线

我一直在寻找一个轻量级的、进程中的异步消息总线,并遇到了 TPL 数据流。

我当前的实现如下(完整示例位于https://gist.github.com/4416655)。

我有一些关于在消息传递场景中使用 TPL 数据流的一般性问题。

  • 是否BroadcastBlock<T>推荐同时向多个处理程序发送消息的来源?这是我根据这篇文章得出的结论。
  • 在我的实现中,我对所有消息类型使用单个BroadcastBlock<T>实例。在处理大量消息时这会导致问题吗?我应该为每种消息类型创建一个单独的实例吗?
  • BroadcastBlock<T>始终存储最后发送的项目。这意味着任何新的订阅(链接)都将自动传递此消息。有可能改变这种行为(新订阅应该只接收新消息)。
  • 在我的测试应用程序中,我在第一个处理程序中引入了延迟:

    发送消息时,我希望看到每条消息一一输出到控制台,增量为 2s。相反,在 2 秒后,所有消息都立即输出。我假设这是由于底层调度程序执行的并行性,但我很好奇如何更改这些设置(设置MaxDegreeOfParallelism = 1没有区别)。

  • 最后,虽然SendAsync允许我等待消息的发送,但它不允许我等待目标(的ActionBlock<T>)完成。我认为这是PropagateCompletion会做的,但似乎并非如此。理想情况下,我想知道消息的所有处理程序何时执行。

更新

我没有得到预期行为的原因Task.Delay是这延迟了每个处理程序的执行,而不是所有处理程序的处理。Thread.Sleep是我需要的。

0 投票
1 回答
2172 浏览

.net - TPL 数据流生产者消费者模式

刚刚使用 TPL DataFlow 编写了一个示例生产者消费者模式。我在这里有一些基本问题。

  1. 只有在生产者发布了所有项目后,消费者才处于活动状态。异步是否意味着生产和消费任务都可以并行运行。

  2. 给定消费者的睡眠时间,以验证其是否阻塞了其他数据项。它似乎是按顺序执行的,没有得到任何并行性。

我在这里做错了吗?

0 投票
1 回答
498 浏览

c# - 何时使用 ISourceBlock 或 IObservable

我需要使用推模型返回一组项目(而不是拉,如 IEnumerable)。但是,我不确定是否应该使用响应式扩展中的 IObservable 或 TPL 数据流中的 ISourceBlock。

它们看起来很相似,在什么情况下应该选择一个而不是另一个?

0 投票
1 回答
511 浏览

c# - 这个构造函数的用途是什么:ActionBlock构造函数(函数)

我之前通过另一个构造函数使用了 ActionBlock:

ActionBlock<TInput> Constructor (Action<TInput>)

但是对于标题中的返回类型任务,我不确定 ActionBlock 对返回的任务做了什么。我认为这是为了以某种方式等待提供给构造函数的异步委托?我能抓住它吗?