1

为什么njoin在处理之前预取数据?这似乎是一个不必要的复杂化,除非它与流程的流程如何合并有关?

我有一个在生成新元素时运行效果的流。我想将效果保持在最低限度,因此每当 anjoin时,比如说maxOpen = 44 应该是同时生成的最大元素数(除非可以立即处理,否则不应生成任何元素)。

有没有办法优雅地解决这个问题njoin?现在我正在使用“票”的有界队列(只有在获得票后才会生成一个元素)。

4

1 回答 1

1

请参阅https://github.com/scalaz/scalaz-stream/issues/274,特别是来自 djspiewak 的以下评论。

“从概念层面来看,这里的问题是 Process 的“拉”模型和任何并发流合并所需的“推”模型之间的接口点。wye 和 njoin 都位于这个边界点,并通过以下方式“作弊”主动拉动它们的源进程以填充入站队列,将结果推入出站队列等待输出进程的拉动。(显然,wye 和 njoin 都通过 Actor 使他们的入站队列隐含)在大多数情况下,这非常有效它保留了用户关心的大部分属性(例如终止传播、背压等)。”

njoined 的第二个参数 maxQueued 限制了预取的数量。如果该参数为 0,则队列大小没有限制,因此预取也没有限制。调用 njoin 的 mergeN 的文档更多地解释了这种预取行为的原因。“在内部,mergeN 保留了一个小的缓冲区,该缓冲区可以提前读取等于活动源流数量的值。这并不意味着在这个预读缓存中咨询每个进程n,它只是在处理时尝试尽可能公平提供他们的AnsourceA以几乎相同的速度。”所以看起来 njoin 正在处理当所有源几乎同时提供一个值时会发生什么的问题,但它试图防止任何一个加入的流挤出较慢的流.

于 2015-09-09T03:28:09.753 回答