问题标签 [scalaz-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
4019 浏览

scala - 使用 Scalaz Stream 解析任务(替换 Scalaz Iteratees)

介绍

我在许多项目中使用Scalaz 7的迭代器,主要用于处理大型文件。我想开始切换到 Scalaz,它旨在替换 iteratee 包(坦率地说,它缺少很多部分并且使用起来有点痛苦)。

流基于机器(迭代思想的另一种变体),它也已在 Haskell 中实现。我使用了一点 Haskell 机器库,但是机器和流之间的关系并不完全明显(至少对我而言),流库的文档仍然有点稀疏

这个问题是关于一个简单的解析任务,我希望看到使用流而不是迭代器来实现。如果没有其他人超过我,我会自己回答这个问题,但我确信我不是唯一一个正在进行(或至少考虑)这种转变的人,因为无论如何我都需要完成这个练习,我想我还不如在公共场合做。

任务

假设我有一个文件,其中包含已标记化并用词性标记的句子:

每行有一个标记,单词和词性由一个空格分隔,空行表示句子边界。我想解析这个文件并返回一个句子列表,我们不妨将其表示为字符串元组的列表:

像往常一样,如果我们遇到无效输入或文件读取异常,我们希望优雅地失败,我们不想担心手动关闭资源等。

一个迭代的解决方案

首先是一些通用的文件读取内容(它确实应该是 iteratee 包的一部分,它目前不提供任何远程这个高级别的东西):

然后我们的句子阅读器:

最后是我们的解析动作:

我们可以证明它有效:

我们完成了。

我想要的是

或多或少是使用 Scalaz 流而不是迭代器实现的相同程序。

0 投票
1 回答
773 浏览

performance - 使用 scalaz-stream 计算行数的性能

我已将Scala 函数式编程linesGt1第 15 章开头的命令式行计数代码(请参阅 参考资料)翻译成使用scalaz-stream的解决方案(请参阅参考资料)。然而,它的表现并不是那么出色。命令式代码比我的 scalaz-stream 解决方案快大约 30 倍。所以我想我在做一些根本错误的事情。如何提高 scalaz-stream 代码的性能?linesGt2linesGt2

这是我的完整测试代码:

0 投票
1 回答
481 浏览

scala - Scalaz 流组排序的数据库结果

我在我的代码中看到了一个常见的模式。我已经对数据库中的结果进行了排序,我需要以嵌套结构发出它们。我希望它能够流式传输,因此我希望一次在内存中拥有尽可能少的记录。使用 TravesableLike.groupBy 假定数据未排序,因此它不必要地填充可变映射。我想保持这种真正的流媒体。scalaz-stream 在这里有用吗?

我没有在 Process 上看到太多函数,如 foldLeft 和 scanLeft,所以我不确定如何检测 grandparent_id、parent_id 或 child_id 何时更改并发出组。有任何想法吗?

0 投票
1 回答
508 浏览

scala - 使用 scalaz 流并行处理多个文件

我正在尝试使用scalaz-stream同时处理多个文件,将单个函数应用于文件中的每一行,跨越所有文件。具体来说,假设我有一个接受字符串列表的函数

还有几个文件,第一个:

第二:

整个过程的结果应该是:

(或者更有可能直接写入其他文件)

事先不知道文件的数量,并且不同文件之间的行数可能会有所不同,但是我可以使用默认值填充(在运行时)较短文件的末端,或者删除较长的文件。

所以本质上,我需要一种方法将 a Seq[Process[Task, String]](通过类似的东西获得io.linesR)组合成一个Process[Task, Seq[String]].

实现这一目标的最简单方法是什么?

或者,更一般地说,我如何将n实例组合Process[F, I]成一个实例Process[F, Seq[I]]

我确信为此目的有一些标准组合器,但我无法弄清楚......

谢谢。

0 投票
2 回答
2726 浏览

scala - Scala 流媒体库差异(Reactive Streams/Iteratee/RxScala/Scalaz...)

我正在关注 Coursera 上的 Scala 函数式反应式编程课程,我们处理 RxScala Observables(基于 RxJava)。

据我所知,Play Iteratee 的库看起来有点像 RxScala Observables,其中 Observables 有点像 Enumerators,Observers 有点像 Iteratees。

还有 Scalaz Stream 库,也许还有其他一些?


所以我想知道所有这些库之间的主要区别。在哪种情况下,一个可能比另一个更好?


PS:我想知道为什么 Martin Odersky 没有为他的课程选择 Play Iteratees 库,因为 Play 在 Typesafe 堆栈中。这是否意味着 Martin 更喜欢 RxScala 而不是 Play Iteratees?


编辑: Reactive Streams计划刚刚宣布,作为尝试standardize a common ground for achieving statically typed, high-performance, low latency, asynchronous streams of data with built-in non-blocking back pressure

0 投票
1 回答
608 浏览

stream - 您如何组合多个 Scalaz-Streams 以保留完成顺序但不强制执行交错?

您如何获得一个由 2 个进程组合​​而成的进程,但无论它们到达的顺序如何(这意味着如果左边在右边之前两次,没关系,给左边两次,然后当右边到达时发出它)?

我正在寻找睡眠时间较短的睡眠更频繁地发生,并在较慢的过程之前看到它多次出现。提前感谢任何花时间阅读本文的人,尤其是那些可以分享一些见解的人。

0 投票
1 回答
395 浏览

scala - scalaz-stream 中的 Bucketed Sink

我正在尝试制作一个将流写入分桶文件的接收器:当达到特定条件(时间、文件大小等)时,当前输出流将关闭,新输出流将打开到新的桶文件。

我检查了如何在io对象中创建不同的接收器,但没有太多示例。因此,我尝试遵循如何编写resourcechunkW编写。我最终得到了以下代码,为简单起见,桶现在只用一个表示Int,但最终会是某种类型的输出流。

这其中有很多问题:

  1. step在开始时将桶传递一次,并且在递归期间永远不会改变。我不确定如何在递归go中创建一个step将使用前一个任务中的存储桶(Int)的新任务,因为我必须提供一个字符串才能完成该任务。
  2. 调用的fallbackandcleanup没有await收到结果rcv(如果有的话)。在io.resource函数中,由于资源是固定的,它可以正常工作,但是,在我的情况下,资源可能会在任何步骤发生变化。我将如何将对当前打开的存储桶的引用传递给这些回调?
0 投票
2 回答
517 浏览

scala - 运行 scalaz-stream 的 Process 找不到参数 C 的隐含值:scalaz.Catchable[F2]?

为什么我error: could not find implicit value for parameter C: scalaz.Catchable[F2]在执行时得到以下信息P(1,2,3).run

scalaz-stream-sandbox 项目在 GitHub 上可用。执行sbt console然后P(1,2,3).run面对问题。

0 投票
1 回答
5727 浏览

file - Scala快速文本文件读取并上传到内存

在 Scala 中,为了读取文本文件并将其上传到数组中,一种常见的方法是

特别是对于非常大的文件,是否有一种更快的方法,可能是先将字节块读入内存,然后用换行符分割它们?(有关常用方法,请参阅阅读 Scala 中的整个文件。)

非常感谢。

0 投票
1 回答
463 浏览

scala - 什么是等效于 Play Framework 的 Enumerator.fromCallback 的 scalaz-stream

Play Framework 的 iteratee 库定义了一个方法,该方法Enumerator.fromCallback允许基于 Future 的结果生成元素:

http://www.playframework.com/documentation/2.2.x/Enumerators

你可以在这里看到一个很好的例子,它被用来从 Web 服务传递分页结果:

http://engineering.klout.com/2013/01/iteratees-in-big-data-at-klout/

执行相同操作的等效 scalaz-stream 代码是什么?我很确定它可以使用Process.emitorProcess.await或可能来完成Process.eval,但我很想看到一个成功的例子。这可能还需要将 scala Future 提升为 scalaz Task,这里有一个答案:

将 scala 2.10 未来转换为 scalaz.concurrent.Future // 任务

如果它使事情变得更简单,我们可以忽略 scala Future vs scalaz Task 位并假设我们有一个 Task。