问题标签 [scalaz-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 使用 Scalaz Stream 解析任务(替换 Scalaz Iteratees)
介绍
我在许多项目中使用Scalaz 7的迭代器,主要用于处理大型文件。我想开始切换到 Scalaz流,它旨在替换 iteratee 包(坦率地说,它缺少很多部分并且使用起来有点痛苦)。
流基于机器(迭代思想的另一种变体),它也已在 Haskell 中实现。我使用了一点 Haskell 机器库,但是机器和流之间的关系并不完全明显(至少对我而言),流库的文档仍然有点稀疏。
这个问题是关于一个简单的解析任务,我希望看到使用流而不是迭代器来实现。如果没有其他人超过我,我会自己回答这个问题,但我确信我不是唯一一个正在进行(或至少考虑)这种转变的人,因为无论如何我都需要完成这个练习,我想我还不如在公共场合做。
任务
假设我有一个文件,其中包含已标记化并用词性标记的句子:
每行有一个标记,单词和词性由一个空格分隔,空行表示句子边界。我想解析这个文件并返回一个句子列表,我们不妨将其表示为字符串元组的列表:
像往常一样,如果我们遇到无效输入或文件读取异常,我们希望优雅地失败,我们不想担心手动关闭资源等。
一个迭代的解决方案
首先是一些通用的文件读取内容(它确实应该是 iteratee 包的一部分,它目前不提供任何远程这个高级别的东西):
然后我们的句子阅读器:
最后是我们的解析动作:
我们可以证明它有效:
我们完成了。
我想要的是
或多或少是使用 Scalaz 流而不是迭代器实现的相同程序。
performance - 使用 scalaz-stream 计算行数的性能
我已将Scala 函数式编程linesGt1
第 15 章开头的命令式行计数代码(请参阅 参考资料)翻译成使用scalaz-stream的解决方案(请参阅参考资料)。然而,它的表现并不是那么出色。命令式代码比我的 scalaz-stream 解决方案快大约 30 倍。所以我想我在做一些根本错误的事情。如何提高 scalaz-stream 代码的性能?linesGt2
linesGt2
这是我的完整测试代码:
scala - Scalaz 流组排序的数据库结果
我在我的代码中看到了一个常见的模式。我已经对数据库中的结果进行了排序,我需要以嵌套结构发出它们。我希望它能够流式传输,因此我希望一次在内存中拥有尽可能少的记录。使用 TravesableLike.groupBy 假定数据未排序,因此它不必要地填充可变映射。我想保持这种真正的流媒体。scalaz-stream 在这里有用吗?
我没有在 Process 上看到太多函数,如 foldLeft 和 scanLeft,所以我不确定如何检测 grandparent_id、parent_id 或 child_id 何时更改并发出组。有任何想法吗?
scala - 使用 scalaz 流并行处理多个文件
我正在尝试使用scalaz-stream
同时处理多个文件,将单个函数应用于文件中的每一行,跨越所有文件。具体来说,假设我有一个接受字符串列表的函数
还有几个文件,第一个:
第二:
整个过程的结果应该是:
(或者更有可能直接写入其他文件)
事先不知道文件的数量,并且不同文件之间的行数可能会有所不同,但是我可以使用默认值填充(在运行时)较短文件的末端,或者删除较长的文件。
所以本质上,我需要一种方法将 a Seq[Process[Task, String]]
(通过类似的东西获得io.linesR
)组合成一个Process[Task, Seq[String]]
.
实现这一目标的最简单方法是什么?
或者,更一般地说,我如何将n
实例组合Process[F, I]
成一个实例Process[F, Seq[I]]
?
我确信为此目的有一些标准组合器,但我无法弄清楚......
谢谢。
scala - Scala 流媒体库差异(Reactive Streams/Iteratee/RxScala/Scalaz...)
我正在关注 Coursera 上的 Scala 函数式反应式编程课程,我们处理 RxScala Observables(基于 RxJava)。
据我所知,Play Iteratee 的库看起来有点像 RxScala Observables,其中 Observables 有点像 Enumerators,Observers 有点像 Iteratees。
还有 Scalaz Stream 库,也许还有其他一些?
所以我想知道所有这些库之间的主要区别。在哪种情况下,一个可能比另一个更好?
PS:我想知道为什么 Martin Odersky 没有为他的课程选择 Play Iteratees 库,因为 Play 在 Typesafe 堆栈中。这是否意味着 Martin 更喜欢 RxScala 而不是 Play Iteratees?
编辑: Reactive Streams计划刚刚宣布,作为尝试standardize a common ground for achieving statically typed, high-performance, low latency, asynchronous streams of data with built-in non-blocking back pressure
stream - 您如何组合多个 Scalaz-Streams 以保留完成顺序但不强制执行交错?
您如何获得一个由 2 个进程组合而成的进程,但无论它们到达的顺序如何(这意味着如果左边在右边之前两次,没关系,给左边两次,然后当右边到达时发出它)?
我正在寻找睡眠时间较短的睡眠更频繁地发生,并在较慢的过程之前看到它多次出现。提前感谢任何花时间阅读本文的人,尤其是那些可以分享一些见解的人。
scala - scalaz-stream 中的 Bucketed Sink
我正在尝试制作一个将流写入分桶文件的接收器:当达到特定条件(时间、文件大小等)时,当前输出流将关闭,新输出流将打开到新的桶文件。
我检查了如何在io
对象中创建不同的接收器,但没有太多示例。因此,我尝试遵循如何编写resource
和chunkW
编写。我最终得到了以下代码,为简单起见,桶现在只用一个表示Int
,但最终会是某种类型的输出流。
这其中有很多问题:
step
在开始时将桶传递一次,并且在递归期间永远不会改变。我不确定如何在递归go
中创建一个step
将使用前一个任务中的存储桶(Int)的新任务,因为我必须提供一个字符串才能完成该任务。- 调用的
fallback
andcleanup
没有await
收到结果rcv
(如果有的话)。在io.resource
函数中,由于资源是固定的,它可以正常工作,但是,在我的情况下,资源可能会在任何步骤发生变化。我将如何将对当前打开的存储桶的引用传递给这些回调?
scala - 运行 scalaz-stream 的 Process 找不到参数 C 的隐含值:scalaz.Catchable[F2]?
为什么我error: could not find implicit value for parameter C: scalaz.Catchable[F2]
在执行时得到以下信息P(1,2,3).run
?
scalaz-stream-sandbox 项目在 GitHub 上可用。执行sbt console
然后P(1,2,3).run
面对问题。
file - Scala快速文本文件读取并上传到内存
在 Scala 中,为了读取文本文件并将其上传到数组中,一种常见的方法是
特别是对于非常大的文件,是否有一种更快的方法,可能是先将字节块读入内存,然后用换行符分割它们?(有关常用方法,请参阅阅读 Scala 中的整个文件。)
非常感谢。
scala - 什么是等效于 Play Framework 的 Enumerator.fromCallback 的 scalaz-stream
Play Framework 的 iteratee 库定义了一个方法,该方法Enumerator.fromCallback
允许基于 Future 的结果生成元素:
http://www.playframework.com/documentation/2.2.x/Enumerators
你可以在这里看到一个很好的例子,它被用来从 Web 服务传递分页结果:
http://engineering.klout.com/2013/01/iteratees-in-big-data-at-klout/
执行相同操作的等效 scalaz-stream 代码是什么?我很确定它可以使用Process.emit
orProcess.await
或可能来完成Process.eval
,但我很想看到一个成功的例子。这可能还需要将 scala Future 提升为 scalaz Task,这里有一个答案:
将 scala 2.10 未来转换为 scalaz.concurrent.Future // 任务
如果它使事情变得更简单,我们可以忽略 scala Future vs scalaz Task 位并假设我们有一个 Task。