问题标签 [tpl-dataflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1512 浏览

task-parallel-library - 优先的 TPL DataFlow BufferBlock

这应该是很自然的事情,我想知道是否有来自 TPL DataFlow 库的 Prioritized BufferBlock 的现成实现?

0 投票
0 回答
1413 浏览

c# - MVVM WPF应用中使用TPL Dataflow的真实应用场景

我可以请您指出一些示例,该示例展示了在数据服务中使用 TPL 数据流时如何在 MVVM 应用程序中使用 TPL 数据流。

在我的 MVVM 应用程序中,我将应用程序块分为 5 个不同的项目实现模型(负责更新模型的域和服务项目)、视图模型(应用程序项目)和视图(演示项目 - WPF 应用程序)。

使用控制台应用程序测试域时,服务和应用程序都可以正常工作。

现在我创建了 Winform 项目来测试我稍后将在 WPF 应用程序中放置的内容,以测试应用程序块,但没有任何效果。

我查看了 Async CTP 中的示例:(C# Dataflow) Real Estate,(C# Dataflow) Dining Philosophers。好吧,这些示例展示了如何使用 Dataflow,但代码放在 GUI 中,而不是在某些服务程序集中分开。

我的域和应用程序块非常复杂,我不想将它们放在 GUI 中。

我读到 TPL Dataflow 作为 CCR 的继任者,事实上几年前我已经在我的项目中使用了 CCR。CCR 在这种情况下为我工作。

0 投票
4 回答
3784 浏览

c# - 如果排队的项目数小于 BatchSize,如何在超时后自动调用 TriggerBatch?

使用 Dataflow CTP(在 TPL 中)

如果当前排队或推迟的项目数小于 BatchSize,是否有办法在超时后自动调用 BatchBlock.TriggerBatch?

更好的是:每次块收到新项目时,此超时应重置为 0。

0 投票
2 回答
598 浏览

task-parallel-library - rx 的 Observable.FromEventPattern 的 TPL 等价物是什么?

在 rx 你可以写:

然后订阅 observable 将 sqlDep 对象上的 OnChange 事件转换为 observable。

同样,如何使用任务并行库从 C# 事件创建任务?

编辑:澄清 Drew 指出然后由 user375487 明确编写的解决方案适用于单个事件。任务一完成……嗯,它就完成了。

可观察事件可以随时再次触发。它可以看作是一个可观察的流。TPL 数据流中的一种 ISourceBlock。但是在文档http://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx中没有 ISourceBlock 的示例。

我最终找到了一个论坛帖子,解释了如何做到这一点:http ://social.msdn.microsoft.com/Forums/en/tpldataflow/thread/a10c4cb6-868e-41c5-b8cf-d122b514db0e

公共静态 ISourceBlock CreateSourceBlock(Action,Action,Action,ISourceBlock> executor){ var bb = new BufferBlock(); 执行者(t => bb.Post(t), () => bb.Complete(), e => bb.Fault(e), bb); 返回bb;}

我没有尝试过上面的代码。异步委托和 CreateSourceBlock 的定义可能不匹配。

0 投票
1 回答
2708 浏览

task-parallel-library - TPL Dataflow over Reactive Extensions (Rx) 的用例是什么

我正在专门研究以一种或其他方式编写一些信号处理算法,或者可能是这两者的某种组合。

性能不是一个大问题,表达意图的清晰性更为重要。

我希望实现以下“块”并组合它们:

  • 滤波器(FIR 和 IIR)
  • 相位检测器
  • 集成商
  • 搅拌机
  • 函数发生器
  • PLL(使用上述作为构建块)

我知道 Rx 可以被视为“Linq-to-streams”,而 TPL 是对并发的抽象。我还了解到 Rx 在内部使用 TPL 来管理其异步位,并且 TPL 数据流为 TPL 添加了可组合性。

所以两者都是异步的,都是可组合的,都是相当高级的(Rx moreso)。一般情况下和我上面的信号处理项目中应该在哪里使用它们?

0 投票
1 回答
567 浏览

c# - 新的 Tasks.Dataflow 块是否通过网络工作?

我想知道让新的 Tasks.Dataflow 块在网络上工作有多难。最好的方法是什么?使用 WCF RPC 链接块,还是使用一些 3rd 方消息传递/队列服务?

我还能以原子方式使用消息吗?我在使用 SQS 时遇到的问题之一是无法保证一条消息将只发送一次。

0 投票
1 回答
1858 浏览

c# - 明显的 BufferBlock.Post/Receive/ReceiveAsync 竞赛/错误

交叉发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

我知道......我并没有真正发挥 TplDataflow 的最大潜力。ATM 我只是BufferBlock用作消息传递的安全队列,其中生产者和消费者以不同的速率运行。我看到一些奇怪的行为让我不知道如何进行。

在上面的代码中(它是 2000 行分布式解决方案的一部分),Send每 100 毫秒左右定期调用一次。这意味着一个项目以每秒 10 次左右的速度被Post编辑。messageQueue这是经过验证的。但是,偶尔会出现ReceiveAsync在超时内没有完成(即Post没有导致ReceiveAsync完成)并且TimeoutException在 30 秒后被提升。此时,messageQueue.Count已是数百。这是出乎意料的。在较慢的发布速度(1 个帖子/秒)中也观察到此问题,并且通常发生在 1000 个项目通过BufferBlock.

所以,为了解决这个问题,我使用了以下代码,它可以工作,但在接收时偶尔会导致 1s 延迟(由于上述错误发生)

对我来说,这看起来像是 TDF 中的竞争条件,但我无法深入了解为什么在我BufferBlock以类似方式使用的其他地方不会发生这种情况。实验性地从ReceiveAsyncto改变Receive并没有帮助。我没有检查过,但我想孤立地看,上面的代码可以完美运行。这是我在“TPL 数据流简介” tpldataflow.docx中看到的一种模式。

我能做些什么来解决这个问题?是否有任何指标可以帮助推断正在发生的事情?如果我无法创建可靠的测试用例,我还能提供哪些信息?

帮助!

0 投票
2 回答
1343 浏览

c# - 处理大量文件

我目前正在从事一个涉及索引大量文件(240k)的研究项目;它们主要是 html、xml、doc、xls、zip、rar、pdf 和文本,文件大小从几 KB 到超过 100 MB。

提取所有 zip 和 rar 文件后,我最终得到一百万个文件。

我正在使用支持 TPL Dataflow 和 Async CTP V3 的 Visual Studio 2010、C# 和 .NET 4.0。为了从这些文件中提取文本,我使用 Apache Tika(使用 ikvm 转换)并使用 Lucene.net 2.9.4 作为索引器。我想使用新的 TPL 数据流库和异步编程。

我有几个问题:

  1. 如果我使用 TPL,我会获得性能优势吗?它主要是一个 I/O 过程,据我了解,当您大量使用 I/O 时,TPL 不会提供太多好处。

  2. 生产者/消费者方法是否是处理此类文件处理的最佳方式,还是有其他更好的模型?我正在考虑使用阻塞集合创建一个具有多个消费者的生产者。

  3. TPL 数据流库对这种类型的流程有用吗?似乎 TPL 数据流最适合用于某种消息传递系统......

  4. 在这种情况下我应该使用异步编程还是坚持同步?

0 投票
1 回答
249 浏览

.net - 此块只能与创建它的源一起使用

我在这个初始网络中使用了 TPL 数据流:

当消息到达 b 时,b 会创建一个带有过滤器的新转换块并​​将其附加到自身(它不会对每条消息都这样做)。

网络变成这样:

运行几次后,网络变成这样:

这很好用,并且是我发现的唯一解决方案,可以获得“开关”块的默认操作。

但是当通过调用 customSource.Complete() 完成源时,它会抛出异常:

ArgumentException:此块只能用于1.System.Threading.Tasks.Dataflow.ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock在 System.Threading.Tasks.Dataflow.Internal.SourceCore 1 目标的 System.Threading.Tasks.Dataflow.DataflowBlock.FilteredLinkPropagator 1 源(布尔型 consumeToAccept)中创建它的源,1.OfferMessageToTarget(DataflowMessageHeader header, TOutput message, ITargetBlock布尔型& messageWasAccepted) 在 System.Threading.Tasks.Dataflow.Internal.SourceCore 1.OfferToTargets(ITargetBlock1 linkToTarget) 在 System.Threading.Tasks.Dataflow.Internal.SourceCore`1.OfferMessagesLoopCore()

0 投票
1 回答
4601 浏览

c# - FromCurrentSynchronizationContext,我错过了什么吗?

我目前正在处理一个从包含数千个文件的大型二进制文件中读取的应用程序,每个文件都由应用程序中的某个其他类处理。此类返回一个对象或 null。我想在主窗体上显示一个进展,但由于某种原因,我无法理解它。

上面的代码冻结了我的表单,即使我使用“TaskScheduler.FromCurrentSynchronizationContext”,为什么?因为当我使用下面的代码时,我的表单更新正常

我是整个 TPL 数据流的新手,所以我希望有人可以分享一些关于为什么在第二个代码片段中它有效而在第一个片段中它没有的原因。

亲切的问候,马丁