问题标签 [fs2]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
305 浏览

scala - FS2 动态创建的订阅者

我在 fs2 中使用 pub-sub 模式。我在处理消息流时动态创建主题和订阅者。出于某种原因,我的订阅者只收到初始消息,但进一步发布的消息永远不会到达订阅者

订阅者代码很简单:

输出如下:

我没有看到发布到现有主题的消息。另外,我想知道是否有更好的方法来启动订阅者的异步流,而不是Stream.empty.interruptWhen(interrupter) concurrently startSubscribers2(t)

0 投票
1 回答
3268 浏览

scala - flatMap、flatTap、evalMap 和 evalTap 的区别

在用于功能流的 Scala fs2 库中:

我试图理解flatMap,flatTap和之间evalMap的区别evalTap。它们似乎都执行相同的操作,即流值的转换。

有什么区别,什么时候应该使用它们?

0 投票
1 回答
966 浏览

scala - 如何推理 Scala Cats / fs2 中的堆栈安全性?

这是fs2文档中的一段代码。该函数go是递归的。问题是我们如何知道它是否是堆栈安全的,以及如何推断任何函数是否是堆栈安全的?

如果我们go从另一个方法调用它也会是堆栈安全的吗?

0 投票
1 回答
1520 浏览

scala - 如何在 fs2 中“拆分”流?

我想做这样的事情:

但这不起作用,因为它从源头“拉”两次 - 当我同时耗尽stream和时,每次一次stream.map(split)。我该如何防止这种情况?通过维护我自己的内部缓冲区以某种方式切换到基于“推”的模型,这样我就不会拉两次?

0 投票
1 回答
622 浏览

scala - 使用 fs2 将流值列表转换为值流

我想使用fs2 Streams、cats EitherTcats-effect IO 定义一个具有以下签名的函数。

它将每个值从 vs 映射到一个值流,并将所有这些值收集到一个新流中。

我试过类似的东西:

但似乎 Stream 没有隐含的定义。

0 投票
1 回答
193 浏览

scala - fs2.Stream[IO,Something] 不返回 take(1)

我有一个返回 fs2.Stream 的函数。

通常它是一个无限流,除非我给它传递一个测试标志,在这种情况下它应该输出一个值并停止。

无限流工作正常。它无限地给了我所有的测量值。测试流确实只给了我第一次测量,仅此而已。我遇到的问题是在最后一次测量之后 Stream 没有返回。它永远阻塞。我究竟做错了什么?

注意:我认为我抽象了基本代码,但有关更多上下文,请查看我的项目:https ://github.com/jkransen/fijnstof/blob/ZIO/src/main/scala/nl/kransen/fijnstof /Main.scala

0 投票
0 回答
78 浏览

scala - 在方法的引擎盖下运行 fs2.Stream?

我只是在学习 FP,所以也许我这样做完全错误。

使用猫、猫效果、fs2、fs2-io。

以下代码将端口作为参数,然后在给定端口上创建连接到 127.0.0.1 的服务器套接字和客户端套接字。

我需要这个函数返回的是分配的服务器套接字端口。

Int => F[Int]问题:在方法的引擎盖下运行 fs2.Stream 时我该怎么做?

我正在使用cats-effects IOApp 来运行该程序。

我的代码:

0 投票
1 回答
744 浏览

scala - FS2 将资源(或效果)作为状态传递

我正在尝试实现一个控制相机的应用程序。相机命令表示为 CameraAction 对象流:

假设我有一个测试流,它发出“记录”并在 20 秒后发出“停止”,再过 20 秒后发出另一个“记录”消息,依此类推,输入流是无限的。

然后应用程序使用“记录”它应该创建一个 GStreamer 管道实例(即它是一种效果)并“运行”它,在“停止”时它“停止”管道并关闭它。然后在随后的“记录”中,使用新的 GStreamer 管道重复该模式。

问题是我需要在流事件的句柄之间传递一个不纯的可变对象的实例。

FS2 文档建议使用块来使流有状态,所以我尝试了

这段代码的问题是,实际录制不会发生在“录制”事件上,而是评估整个块的效果,即当“停止录制”消息到达时,相机会打开,然后立即再次关闭。

如何在不分块的情况下传递“状态”?或者有没有其他方法可以达到我需要的结果?

这可能类似于 带有 StateT[IO, _, _] 的 FS2 Stream,定期转储状态 ,但不同之处在于,在我的情况下,状态不是纯数据结构而是资源。

0 投票
1 回答
96 浏览

scala - Scala fs2:FreeC 和代数做什么?

当我进入 fs2 流操作的实现时,我经常在那里看到fromFreeCAlgebra使用。

究竟fromFreeCAlgebra做什么?为什么以及何时需要它们?

这是一个示例,一个 fs2 流操作eval,它评估效果并返回一个有效的流。

0 投票
1 回答
215 浏览

scala - FS2 中的块文件轮换

我有一大串字符串。现在我想将这些块写入文件。在每个文件中我想写 N 个块。我写了下面的代码来解决这个问题。但我的方法肯定不是惯用的。

1)所以我unsafeRunSync()不止一次打电话这是一个缺陷(因为地图被多次调用)。但我不知道如何以另一种方式解决它。我是 FS2 和 Cats-Effects 的新手。

2)另外,我在chunkN(4)这里打电话。在我的例子中,这看起来是无害的。但在我的实际用例中,我会使用chunkN(10 ^ 6). 我是否Chunks从文档中正确理解 FS2 会在内存中累积 10^6 块,然后将它们发送到下游?

我知道有一个 fileRotate 函数,它可以和 Bytes 一样工作,limit但我认为它在我的用例中并没有真正的帮助。

谢谢