问题标签 [fs2]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
673 浏览

scala - 跳过无限流中的错误

我有一个无限fs2.Stream可能会遇到错误。我想跳过那些什么都不做(可能是日志)的错误并继续流式传输更多元素。例子:

在这种特定情况下,我希望每秒发射无限fs2.StreamLeft(new RuntimeException)

有一种Stream.attempt方法可以生成在遇到第一个错误后终止的流。有没有办法跳过错误并继续拉更多元素?

通常IO.raiseError(new RuntimeException).attempt不会起作用,因为它需要在流管道组合的所有位置尝试所有效果。

0 投票
1 回答
472 浏览

scala - 将元素流分组为多个流

假设我们有一个case class MyCaseClass(name: String, value: Int). 给定一个fs2.Stream[F, MyCaseClass]我想对相同的元素进行分组name

我需要这样做的原因是我想应用有效的转换

到流的所有元素,如果一组失败,另一组应该继续工作。

这样的事情可能吗?

0 投票
2 回答
671 浏览

scala - 为什么与空 fs2.Stream 合并会改变程序的行为

有据可查的是,与空合并fs2.Stream应该产生相同的fs2.Stream. 这是Scaladocs的引文:

拥有的财产merge(Stream.empty, s) == s

考虑以下完整的 Scala 程序fs2.Stream

发射元素

该程序打印以下内容:

它看起来不错。现在应用上面的引用Scaladoc,我得出结论,替换

行为应该是相同的。这是更新的程序:

发射元素并与空 fs2.Stream 合并

程序输出为

问题:为什么与空fs2.Stream更改合并程序的行为会导致复制原始元素fs2.Stream

0 投票
1 回答
296 浏览

scala - 构建一个 ZIO 和 http4s 应用程序,与 sbt 一起工作,在 Bazel 中失败:缺少一个隐式

我正在尝试构建一个集成 ZIO 和 http4s 的服务。

起点是这个例子(它使用 zio 1.0.1、http4s 0.21.3、scala 2.12.11)

我能够使用 构建下面的代码而没有任何问题sbt,但是在尝试使用 Bazel 构建时遇到了麻烦:

Sbt 很高兴,但是当我用 Bazel 构建它时:

关于 bazel 设置:我使用rules_scalafrom HigherkindnessBUILD文件如下:

当涉及到隐式时,我并不太了解,我想知道“魔法酱”的哪一部分缺少让这个在 Bazel 中工作。到目前为止,我有两个假设:

  • 我错过了一个我需要在某处明确指定的依赖项,当使用 sbt 构建时它在类路径上,而在 Bazel 中丢失
  • 整个事情取决于宏,我知道这在我的设置中可能有问题

因此,我有一个基本问题:任何人都可以对正在发生的魔法有所启发,让编译器compile在上面的示例代码中找到正确的隐式传递给?

0 投票
2 回答
964 浏览

scala - 将 fs2 流输出拆分为两个文件

我刚刚开始使用 fs2 流进行冒险。我想要实现的是读取一个文件(一个大文件,这就是我使用 fs2 的原因),对其进行转换并将结果写入两个不同的文件(基于一些谓词)。一些代码(来自https://github.com/typelevel/fs2),带有我的评论:

最有效的方法是什么?显而易见的解决方案是让两个流具有不同的过滤器,但效率低下(将有两次通过)。

0 投票
1 回答
581 浏览

scala - 提高涉及文件转换的 fs2 流的性能

我有这样的东西(这是https://github.com/typelevel/fs2的一个例子,我的补充,我用评论标记):

如果fahrenheit.txt与例如一样大。300mb 原始代码的执行需要几分钟。看来我的代码并没有更快。我怎样才能提高它的性能?运行时有大量未使用的CPU电源,磁盘是SSD,所以我不知道为什么它这么慢。我不确定我balance是否正确使用。

0 投票
0 回答
204 浏览

scala - 如何登录 fs2 Stream.takeWhile

我们有一个使用 fs2-kafka 从 kafka 中提取的事件流,并且当事件比给定的截止日期更新或偏移量位于分区末尾时,我们正在完成处理(这对于问题来说并不重要,但要提供有关该程序的一些上下文)。

理想情况下,我们希望在满足这些条件时记录,但takeWhile需要takeThrough纯函数O => Boolean

我们的流是:

显然我们可以做一个log.info内部shouldProcessatLongEndOffset但这意味着在一个纯函数内部产生副作用,我们不希望这样做。

如果不调用两次函数(一种用于记录,另一种用于条件评估),哪种方法会更好?

谢谢!

0 投票
2 回答
723 浏览

scala - 如何异步中断 fs2 流?

我正在尝试使用 SignalRef 中断 fs2 流。我使用以下内容设置并运行流。流应该在包含时运行,并在包含switchfalse中断switchtrue

然后我尝试用

但是,流仍在继续。在标准输出中我看到

所以显然它没有开始切换到 true 吗?

0 投票
0 回答
107 浏览

scala - 一些消费者只获得一个分区和一些消费者数据的所有分区的相同消费者组ID

我有 4 个消费者使用消费者组 ID 为一个主题消费数据。在 4 个消费者中,其中 2 个获得了分配给多个分区的仅一个分区的数据,其中 2 个获得了分配给该消费者的所有分区的数据。

如何让消费者获得分配给它的所有分区的数据?

我正在使用 fs2-kafka。

0 投票
1 回答
397 浏览

scala - 使用 fs2 处理带有内部流的流

我有 2 个带有排序数据的 csv 文件:文件 1:排序的数字(~1GB) 文件 2:排序的数字 + 额外数据(~20GB)

我需要在文件 2 中查找文件 1 中的所有数字并进行一些处理(跳过文件 2 中不存在于文件 1 中的数字)。

到目前为止,我有:

哪个打印:

这意味着第二个流为文件 1 中的每个值重新启动,我如何保持上次读取的位置并从那里开始?数字是排序的,所以没有点重新开始。我对 scala 和 fs2 非常陌生,因此我将不胜感激解释我的误解。

谢谢!