问题标签 [scalaz-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
253 浏览

scala - 您如何使用 scalaz 流写入和读取外部进程

我希望能够将数据从 scalaz 流发送到外部程序,然后在未来大约 100 毫秒内得到该项目的结果。虽然我可以通过以下代码通过Sink输入流压缩输出流Process然后丢弃Sink副作用来做到这一点,但我觉得这个解决方案可能非常脆弱。

如果外部程序的输入项之一有错误,那么一切都将不同步。我觉得最好的选择是将某种增量 ID 发送到外部程序中,它可以在将来回显,这样如果发生错误,我们可以重新同步。

我遇到的主要麻烦是将数据发送到外部程序的结果与程序Process[Task, Unit]的输出结合在一起Process[Task, String]。我觉得我应该使用来自wyn但不确定的东西。

0 投票
1 回答
240 浏览

scala - 使用 scalaz.stream 持续获取数据库结果

我是 scala 的新手,对 scalaz 非常陌生。通过不同的 stackoverflow 答案和一些手把手,我能够使用 scalaz.stream 来实现一个可以持续获取 twitter API 结果的 Process。现在我想对存储 twitter 句柄的 Cassandra DB 做同样的事情。

获取推特结果的代码在这里:

我打算做的是使用

持续获取 Cassandra 结果(使用 awakeEvery)并将它们发送给演员以运行上述 twitter 获取代码。

我的问题是,用 scalaz.stream 实现这一点的最佳方法是什么?请注意,我希望它获得所有数据库结果,然后在再次获得所有数据库结果之前有一个延迟。我应该使用与上面的 twitter 获取代码相同的架构吗?如果是这样,我将如何创建一个不需要输入的 channel.lift?scalaz.stream 中是否有更好的方法?

提前致谢

0 投票
1 回答
127 浏览

scala - 进程中断时如何终止控制台输入

我正在使用 Scalaz Stream 库并尝试创建一个简单的控制台应用程序。

我按照教程scalaz 流,他们有一个控制台读写的例子。

但是我遇到了一个奇怪的问题,我不确定如何解决。

这是我的代码:

当我运行它时,输出看起来像这样

1 很好,2 5 秒后超时失败,然后 3 需要输入两次。

问题是当进程按时失败时,控制台输入没有被解除阻塞(没有被杀死)并且仍在等待用户的输入。

我试图用期货重现它并面临同样的问题。

谢谢你。

0 投票
0 回答
54 浏览

scalaz-stream - scalaz-stream的内存效率

让我们考虑其 github 页面中 README 中的第一个示例:

它在内存使用方面的效率如何?在将结果传递给下一步之前,它会在每一步缓冲内存中的全部内容吗?或者它会以流式方式完成,这意味着它将有恒定的内存使用?

此外,scalaz-stream 是我们可以考虑在生产环境中使用的高质量库吗?

0 投票
2 回答
69 浏览

scala - 如何在 scalaz-stream 中实现 receiveAvailable 转换器

简洁版本:

我想实现一个函数,该函数返回一个等待“发出”值块的转换器。

我想到的功能将具有以下签名:

细节:

我的理解是,我可以使用这个函数来实现以下我认为非常有用的函数:

[更新] 更多详情

我的最终目标(稍微简化)是实现以下功能:

该函数对语音识别服务进行外部调用。因此,Byte对流中的每一个都进行网络调用是不合理的。在进行网络调用之前,我需要将字节组合在一起。我可以做audio一个Process[Task, ByteVector],但这需要测试代码来知道函数支持的最大块大小,我宁愿由函数本身管理。此外,当在服务内部使用此服务时,该服务本身将接收具有给定音频大小的网络调用,我希望该chunkXXX功能能够智能地进行分块,以便它不会保留已经存在的数据可用的。

基本上,来自网络的音频流将具有格式Process[Task, ByteVector]并将被翻译成Process[Task, Byte]by flatMap(Process.emitAll(_))。但是,测试代码将直接生成 aProcess[Task, Byte]并将其输入voiceRecognition. 从理论上讲,我相信应该有可能给定适当的组合器来提供voiceRecognition对这两个流都做正确事情的实现,我认为chunkByEffect上面描述的功能是关键。我现在意识到我需要 chunkByEffect 函数minmax参数来指定分块的最小和最大大小,而与Task生成字节的底层无关。

0 投票
0 回答
39 浏览

scala - 如何在 scalaz-stream 中实现 wye.mergeLeftBiased

我在使用 scalaz-stream 实现以下功能时遇到问题:

我有一个尝试实现它的分支,但不幸的是它即使在简单的情况下也可以工作(如测试用例所示)。在 scalaz-stream 中的实现wye并不容易阅读(至少对我来说)。

https://github.com/jedesah/scalaz-stream/tree/topic/mergeLeftBiased

0 投票
1 回答
42 浏览

scalaz-stream - scalaz流中的异步“节点”

我有一个Process[Task, A],我需要A => B在每个流上运行一个函数,其运行时间范围从瞬时到非常长,A以产生一个Process[Task, B].

问题是,我想尽快处理每个A结果,并在获得结果后立即传递结果,而不管收到 sExecutionContext的顺序如何。A

一个具体的例子是下面的代码,我希望立即打印所有奇数,大约 500 毫秒后打印偶数。相反,会打印(奇数,偶数)对,并以 500 毫秒的暂停交错:

0 投票
2 回答
83 浏览

scala - Scalaz 流分块 UP 到 N

给定一个这样的队列:

我想拉出这个队列并将其流式传输到下游接收器,以 UP 到 100 的块。

有点工作,但如果我说 101 条消息,它不会清空队列。将留下 1 条消息,除非再推入 99 条消息。我想尽可能多地从队列中取出 100 条消息,以我的下游进程可以处理的速度尽可能快。

是否有现有的组合器可用?

0 投票
1 回答
55 浏览

scala - nondeterminism.njoin:maxQueued 和预取

为什么njoin在处理之前预取数据?这似乎是一个不必要的复杂化,除非它与流程的流程如何合并有关?

我有一个在生成新元素时运行效果的流。我想将效果保持在最低限度,因此每当 anjoin时,比如说maxOpen = 44 应该是同时生成的最大元素数(除非可以立即处理,否则不应生成任何元素)。

有没有办法优雅地解决这个问题njoin?现在我正在使用“票”的有界队列(只有在获得票后才会生成一个元素)。

0 投票
1 回答
29 浏览

scalaz - 从初始值开始应用 Channel N 次

我有一个功能f和一个频道c

我想将函数连续应用f任意次数(或无限期)到先前计算的输出。我提供初始值。

我可以定义一个流程p

但这只会执行一次。

我怎样才能继续应用c到最后一次计算的输出?