问题标签 [tpl-dataflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
4 回答
2043 浏览

c# - 使用 TPL 数据流在预定义块之上创建可重用的处理逻辑?

我喜欢 TPL 数据流。

嗯,一个有趣的设计选择是,大多数预定义的块使用委托来允许我们实现处理逻辑。这在简单的场景中看起来不错。但是让我们想想现实世界的大型应用程序,它需要模块化和封装。我发现使用 DELEGATE 方法编写结构良好的应用程序既困难又不自然。

例如,如果我想要的只是 aMultiplyIntByTwoTransformBlock和 aNoOpActionBlock作为可重用的类 TYPE(不是实例)。我如何实现它?我希望我可以从TransformBlock/继承ActionBlock并说,重写一些Process()方法来实现这一点。但是预定义的块是密封的。他们只接受代表。

我知道我可以从头开始创建自定义块,但显然它对我来说太复杂了,因为我需要的只是在预定义的基础上进行一些自定义。

那么,我该如何实现我的目标呢?

更新:我并不是说有些事情代表不能做。我是说在许多场景中以模板方法模式公开抽象块会更好。说,我希望我可以利用多态性编写一个 AbstractMultiplyBlock 和 MultiplyByTwoBlock 和 MultiplyByThreeBlock。不幸的是,代表们没有提供这种数据和逻辑的可重用性。

0 投票
1 回答
314 浏览

c# - 我应该使用 async 还是 Task 来处理大量并发操作?

我正在编写一个将处理大量输入文件的 Windows 服务。

我的问题是我是否应该使用asyncand await,或者我是否应该使用它来创建一个自定义库Task,这可能会限制并发线程的数量。

我知道一个事实,如果我排队 200 个线程,完成所有事情比让 10 个同时运行需要更长的时间(各种争夺资源、CPU 中的上下文切换等)

所以我的问题是这些新奇的功能是否async更具await运行时弹性,还是我应该编写自己的自定义库?

0 投票
0 回答
647 浏览

mono - 我在哪里可以获得单声道的 TPL.Dataflow?

我在 Macbook 上使用 mono-3.0.10。在 mono 3.0 发行说明中,提到了 Tpl.Dataflow,我还注意到 Tpl.Dataflow 的源代码在 mono 的 github repo master 分支中。

但我在本地单声道安装中找不到它。

我试过从安装程序重新安装单声道,但没有运气。

然后,我尝试了从nuget下载的dll,出现以下错误:

nuget、net45、netcore45和portable-net45有3个版本。我都试过了,但仍然没有运气。

那么,我在哪里可以找到迄今为止 Tpl.Dataflow 与单声道一起使用的正确更新?

0 投票
0 回答
335 浏览

c# - 将 TPL 数据流管道的结果与初始帖子相关联

我正在创建一个数据流管道,该管道负责异步处理发送到HttpListener. 为了响应请求,我可能不得不做一些耗时的操作,例如分析或从数据库中提取大量数据,因此采用了数据流方法。我正在使用 将HttpListener.GetContextAsync传递HttpListenerContext到管道中的第一个块,然后它将处理请求并生成响应。

我遇到的问题是我需要拥有原始HttpListenerContext对象才能将响应发送回客户端。我的第一个想法是,我可以将对象通过 Tuple 内的整个管道传递,但很快就会开始看起来和感觉非常混乱。

最终,从设计的角度来看,我希望能够传递HttpListenerContext到管道并在流程结束时收到一个响应,然后我可以发回该响应,而无需将对象也传递给整个事物。是否可以将管道的最终结果关联回发布到其中的初始对象?

0 投票
1 回答
2149 浏览

.net - 使用基于任务的异步模式 (TAP) 的高效套接字 I/O

我正在开发一个使用原始 TCP 套接字与中央服务器通信的客户端应用程序。应用程序消息被序列化,然后加上长度前缀以创建传递到 TCP 流中的帧。

处理此问题的一种经典方法是直接在套接字类上调用 Receive 或 BeginReceive,在回调上反序列化消息,将消息传递到单独的队列中以供另一个线程处理,然后让回调再次在套接字上开始另一个接收。

这种方法的简单实现对我来说并不理想——它将消息序列化和反序列化与套接字紧密耦合,并且需要相当多的“管道”才能让队列在不同的线程/回调中发挥良好的作用。它也有点抽象——它需要调用代码了解底层套接字,而不是输入和输出消息的“数据流”。

鉴于我完全在 .NET 4.5 中工作,使用 TPL (TaskFactory.FromAsync) 包装 Socket 的 Begin 和 End 异步方法是一个明显的选择。但是,由于多种原因,我不清楚如何从这一点着手:

  1. 我需要一个永远不会完成的异步“任务”来接收数据。只要连接了套接字,我就想要处理消息流。任何中断(断开连接、套接字错误或取消请求)都将是异常,而不是传统的任务完成。根据 Stephen Toub ( http://blogs.msdn.com/b/pfxteam/archive/2011/10/02/10218999.aspx ) 的说法,我应该始终完成我的任务。这产生了一些问题——传统意义上的套接字接收永远不会完成。斯蒂芬似乎在他的“等待套接字操作”帖子中与自己略有矛盾,他在其中展示了一个套接字读取,它永远不会在没有套接字错误的情况下完成(http://blogs.msdn.com/b/pfxteam/archive/2011/12/15 /10248293.aspx)。
  2. 我需要一种同步“排队”要发送的数据的方法。调用者应该能够发送要传输的消息而不会阻塞,并且消息应该通过套接字顺序传输。换句话说,由于消息框架,一次只能在套接字本身上发送一个。TPL 数据流是否适合,或者我应该使用不同的排队模式?
  3. 我希望将消息序列化和消息传输之间的关注点完全分开。

我还没有看到很多使用这种策略的示例,只有“直接”套接字 I/O 或琐碎的实现。我的直觉告诉我,TPL Dataflow 非常适合,因为序列化和反序列化可以流水线化。

我不清楚如何将有效的无限链接收任务连接到 TPL 数据流或类似的东西。

有任何想法吗?

0 投票
1 回答
960 浏览

c# - 保留内存的数据流管道

我创建了一个由 4 个块(包括一个可选块)组成的数据流管道,它负责通过 HTTP 从我的应用程序接收查询对象并从数据库中检索信息,对该数据进行可选转换,然后编写在 HTTP 响应中返回信息。

在我完成的一些测试中,我一直从数据库中提取大量数据(57 万行),这些数据存储在 List 对象中并在不同的块之间传递,看起来即使在最后一个块已经完成之后完成了内存没有被释放。任务管理器中的 Ram 使用量将飙升至 2 GB 以上,当列表命中每个块时,我可以观察到几个大的峰值。

我的块的签名如下所示:

它们的链接如下:

是否有可能我可以设置管道,这样一旦每个块都完成了列表,他们就不再需要保留它,以及一旦整个管道完成,内存就会被释放?

0 投票
3 回答
1603 浏览

c# - ITargetBlock 中的重试策略

我需要在工作流中引入重试策略。假设有 3 个块以这种方式连接:

所以有一个缓冲区来累积数据,然后将其发送到一次处理不超过 3 个项目的转换块,然后将结果发送到操作块。

在处理转换块期间可能会出现瞬态错误,如果错误多次出现瞬态,我想重试该块。

我知道块通常是不可重试的(传递到块中的委托可以重试)。一种选择是包装传递的委托以支持重试。

我也知道有一个非常好的库TransientFaultHandling.Core可以为瞬态故障提供重试机制。这是一个很棒的图书馆,但对我来说不是。如果我将传递给转换块的委托包装到RetryPolicy.ExecuteAsync方法中,则转换块内的消息将被锁定,并且在重试完成或失败之前,转换块将无法接收新消息。想象一下,如果所有 3 条消息都进入重试(假设下一次重试将在 2 分钟内)并且失败,则转换块将被卡住,直到至少有一条消息离开转换块。

我看到的唯一解决方案是扩展TranformBlock(实际上ITargetBlock也足够了),并手动重试(比如从这里开始):

ig 将消息再次延迟放入转换块中,但在这种情况下,重试上下文(剩余的重试次数等)也应传递到此块中。听起来太复杂了...

有没有人看到一种更简单的方法来为工作流块实施重试策略?

0 投票
2 回答
4196 浏览

c# - 实现可重试块的正确完成

Teaser:伙计们,这个问题不是关于如何实施重试策略的。这是关于正确完成 TPL 数据流块。

这个问题主要是我之前的问题Retry policy in ITargetBlock的延续。这个问题的答案是@svick 的智能解决方案,它利用TransformBlock(源)和TransformManyBlock(目标)。剩下的唯一问题是以正确的方式完成这个块:等待所有重试首先完成,然后完成目标块。这是我最终得到的(这只是一个片段,不要过多关注非线程安全retries集):

这个想法是执行某种轮询并验证是否仍有消息等待处理并且没有消息需要重试。但在这个解决方案中,我不喜欢轮询的想法。

是的,我可以将添加/删除重试的逻辑封装到一个单独的类中,甚至可以在重试集为空时执行一些操作,但是如何处理target.InputCount > 0条件?当块没有待处理的消息时,不会调用这样的回调,因此似乎target.ItemCount在具有小延迟的循环中进行验证是唯一的选择。

有人知道实现这一目标的更聪明的方法吗?

0 投票
1 回答
766 浏览

c# - ActionBlock 似乎有时会忽略 MaxDegreeOfParallelism

发布到 ActionBlock 时,我们看到了意外行为,即使 MaxDegreeOfParallelism 为 1,并行处理似乎也在发生。这是场景。

发布到 ActionBlock 的类如下所示:

在下游,我们将字节反序列化为对象,并根据它们的类型将这些对象(我们称之为通知)传递给处理程序:

这个处理程序有一个致命的错误:InitialNotifications 在其对应的 SecondNotifications 之前进入,但 HandleInitialNotification 在退出之前等待 HandleSecondNotification,因此线程永远不会到达 HandleSecondNotification。

通常,我们看到 HandleInitialNotification 一直阻塞,直到它等待 HandleSecondNotification 超时,然后在同一线程上处理挂起的 SecondNotification 继续执行。这是我们通常在日志中看到的内容:

这不是代码的预期工作方式,但考虑到它的编写方式,它应该总是超时等待 SecondNotification。但是,我们偶尔也会看到 HandleInitialNotification 在超时之前完成,HandleSecondNotification 在不同的线程上被及时处理:

由于我们使用默认的 ActionBlock,MaxDegreeOfParallelism 应该为 1。那么,第二个线程(源自 ActionBlock)如何在发布到 ActionBlock 的原始线程阻塞时接收 SecondNotification?

0 投票
1 回答
617 浏览

c# - TPL 数据流:“成为”语义

据我了解,TPL 数据流为 .NET 程序员提供了 Actor 编程模型(不涉及以前可用的 3rd 方解决方案)。Actor 模型本身声明每个 Actor 可以支持三种基本操作:“发送”、“创建”和“成为”。在 TPL Dataflow 中处理“成为”语义的“正确”方法是什么?

请考虑以下示例:

我发现这种“变得”笨拙的方式:我没有改变演员的行为,而是改变了演员实例本身(这会导致不希望的效果)。什么是“正确”的方式?

还有一个问题:据我所知,标准的 TDF 块要么永远不会将消息传递给链接的笔记(例如 ActionBlock,如果这样的逻辑不是手工编写的),要么总是这样做(大多数块)。我是否正确,仅在某些情况下(并非总是)发送消息的逻辑应作为自定义块实现?