问题标签 [scalaz-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
72 浏览

scala - 控制进程的吞吐量

我正在尝试Process[F, A]使用计时器控制 a 的吞吐量Process

但是编译器说这p2是一个Process[Object, Int]. 根据zipWith()签名,它应该是一个Process[Task, Int].

如何限制Process[F, A]if Fis not a的输出Task?我试过了,sleepUntil()但我有类似的问题。

0 投票
1 回答
283 浏览

scala - 为什么 awakeEvery 已从 scalaz-stream 中删除

我发现现代 scalaz-stream 中没有更多awakeEvery内容了。scalaz.stream.Process那么如何运行带有句号的东西呢?

0 投票
0 回答
50 浏览

scalaz-stream - 使用 Scalaz 流,如何根据先前流程的内容评估流程?

我有一个看起来像这样的流:

阿巴巴巴巴巴……

其中 A 和 B 由一个或多个 Process 组成。根据 A 的内容,我们想对 B 做一些事情,或者将其传递到水槽或排水管。

我需要能够在不评估 B 的情况下做出关于 B 的决定,因为它可能相当大的内存不友好 - GB

我想知道我是否需要类似于 Eric 博客文章中概述的内容:http:
//etorreborre.blogspot.com.au/2014/03/streaming-with-previous-and-next.html

0 投票
1 回答
344 浏览

scala - How to convert Iterator to scalaz stream?

Suppose I've got an Iterator[A]. I would like to convert it to Process[Nothing, A] of scalaz stream.

How would you implement foo ?

0 投票
2 回答
47 浏览

scalaz - side-effects for `wye` combinators when using `halt` from scalaz-stream

filter (which uses halt inside) terminates other branch even if it has some side-effects:

Seems logical as there is no value to be returned to yip after that filter. But what about side-effects, specified with observe?

My current solution is to use flatMap to specify default value:

But maybe there is a way to use filter?

P.S. merge combinator executes side-effects for other branch (as it doesn't require value to be returned), but it doesn't wait for other branch if one halts (even if it has side-effects).

0 投票
1 回答
170 浏览

scala - 用于增长列表的 scalaz 流结构

我有一种预感,我可以(应该?)使用 scalaz-streams 来解决我的问题,就像这样。

我有一个起始项目 A。我有一个接受 A 并返回 A 列表的函数。

我有一个以 1 项(起始项)开头的工作队列。当我们处理 ( doSomething) 每个项目时,它可能会将许多项目添加到同一工作队列的末尾。然而,在某些时候(在数百万个项目之后),我们doSomething处理的每个后续项目将开始向工作队列添加越来越少的项目,最终不会添加新项目(doSomething 将为这些项目返回 Nil)。这就是我们知道计算最终将终止的方式。

假设 scalaz-streams 适用于此,请给我一些提示,说明我应该考虑哪些整体结构或类型来实现它?

一旦完成了使用单个“worker”的简单实现,我还想使用多个 worker 并行处理队列项,例如拥有 5 个 worker 池(每个 worker 将其任务分配给代理进行计算doSomething)所以我需要在这个算法中处理效果(比如工人失败)。

0 投票
0 回答
74 浏览

scala - “积累”scalaz-stream 频道

我正在尝试实现一个 scalaz-stream 通道,该通道累积有关它接收到的事件的统计信息,一旦完成,就会发出最终统计信息。

举一个具体的简化示例:假设您有一个Process[Task, String]其中每个字符串都是一个单词的地方。我想要一个Channel[Task, String, (String, Int)],当应用于初始过程时,会耗尽它,计算每个单词出现的次数,然后发出它。

我意识到这是通过折叠微不足道的:

我正在尝试编写的是标准累加器的集合,然后我可以将我的进程通过管道传递——而不是显式折叠,比如说,我会写:

不过,我有点不知所措-如果不共享状态,我无法弄清楚如何做到这一点。编写一个Sink累积到 a 的 aMap[String, Int]相当简单,但是一旦 sink 终止,就无法获得 map 的最终状态并发出它。

0 投票
1 回答
43 浏览

scalaz-stream - 如何将 scalaz-stream Process[F, Option[T]] 转换为 Process[F, T]?

我有一个 scalaz-stream 进程:

如何摆脱选项?

到目前为止,我使用了收集,但感觉并不优雅:

有没有更好的办法?

0 投票
1 回答
73 浏览

scala - scalaz-stream 根据计算值消耗流

我有两个流,我希望能够根据我每秒运行的计算只消耗一个x

我想我基本上需要创建第三个tick流 - 类似于every(3.seconds)- 进行计算,然后在其他两个之间进行某种切换。

我有点卡在这里(而且我才刚刚开始玩弄 scalaz-stream)。

谢谢!

0 投票
1 回答
125 浏览

scala - Scalaz-stream 括号提前关闭资源

考虑这段代码:

我想使用 Process.bracket 来管理资源:

并对一系列数字运行计算:

到目前为止一切顺利......我得到了这个结果:

现在,我想并行运行昂贵的计算:

我得到了这个结果:

显然,括号的释放函数被调用得太早并且过早地关闭了资源。我究竟做错了什么?并行计算完成后有没有办法关闭资源?

谢谢。