问题标签 [fs2]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - FS2 动态创建的订阅者
我在 fs2 中使用 pub-sub 模式。我在处理消息流时动态创建主题和订阅者。出于某种原因,我的订阅者只收到初始消息,但进一步发布的消息永远不会到达订阅者
订阅者代码很简单:
输出如下:
我没有看到发布到现有主题的消息。另外,我想知道是否有更好的方法来启动订阅者的异步流,而不是Stream.empty.interruptWhen(interrupter) concurrently startSubscribers2(t)
scala - flatMap、flatTap、evalMap 和 evalTap 的区别
在用于功能流的 Scala fs2 库中:
我试图理解flatMap
,flatTap
和之间evalMap
的区别evalTap
。它们似乎都执行相同的操作,即流值的转换。
有什么区别,什么时候应该使用它们?
scala - 如何推理 Scala Cats / fs2 中的堆栈安全性?
这是fs2文档中的一段代码。该函数go
是递归的。问题是我们如何知道它是否是堆栈安全的,以及如何推断任何函数是否是堆栈安全的?
如果我们go
从另一个方法调用它也会是堆栈安全的吗?
scala - 如何在 fs2 中“拆分”流?
我想做这样的事情:
但这不起作用,因为它从源头“拉”两次 - 当我同时耗尽stream
和时,每次一次stream.map(split)
。我该如何防止这种情况?通过维护我自己的内部缓冲区以某种方式切换到基于“推”的模型,这样我就不会拉两次?
scala - 使用 fs2 将流值列表转换为值流
我想使用fs2 Streams、cats EitherT和cats-effect IO 定义一个具有以下签名的函数。
它将每个值从 vs 映射到一个值流,并将所有这些值收集到一个新流中。
我试过类似的东西:
但似乎 Stream 没有隐含的定义。
scala - fs2.Stream[IO,Something] 不返回 take(1)
我有一个返回 fs2.Stream 的函数。
通常它是一个无限流,除非我给它传递一个测试标志,在这种情况下它应该输出一个值并停止。
无限流工作正常。它无限地给了我所有的测量值。测试流确实只给了我第一次测量,仅此而已。我遇到的问题是在最后一次测量之后 Stream 没有返回。它永远阻塞。我究竟做错了什么?
注意:我认为我抽象了基本代码,但有关更多上下文,请查看我的项目:https ://github.com/jkransen/fijnstof/blob/ZIO/src/main/scala/nl/kransen/fijnstof /Main.scala
scala - 在方法的引擎盖下运行 fs2.Stream?
我只是在学习 FP,所以也许我这样做完全错误。
使用猫、猫效果、fs2、fs2-io。
以下代码将端口作为参数,然后在给定端口上创建连接到 127.0.0.1 的服务器套接字和客户端套接字。
我需要这个函数返回的是分配的服务器套接字端口。
Int => F[Int]
问题:在方法的引擎盖下运行 fs2.Stream 时我该怎么做?
我正在使用cats-effects IOApp 来运行该程序。
我的代码:
scala - FS2 将资源(或效果)作为状态传递
我正在尝试实现一个控制相机的应用程序。相机命令表示为 CameraAction 对象流:
假设我有一个测试流,它发出“记录”并在 20 秒后发出“停止”,再过 20 秒后发出另一个“记录”消息,依此类推,输入流是无限的。
然后应用程序使用“记录”它应该创建一个 GStreamer 管道实例(即它是一种效果)并“运行”它,在“停止”时它“停止”管道并关闭它。然后在随后的“记录”中,使用新的 GStreamer 管道重复该模式。
问题是我需要在流事件的句柄之间传递一个不纯的可变对象的实例。
FS2 文档建议使用块来使流有状态,所以我尝试了
这段代码的问题是,实际录制不会发生在“录制”事件上,而是评估整个块的效果,即当“停止录制”消息到达时,相机会打开,然后立即再次关闭。
如何在不分块的情况下传递“状态”?或者有没有其他方法可以达到我需要的结果?
这可能类似于 带有 StateT[IO, _, _] 的 FS2 Stream,定期转储状态 ,但不同之处在于,在我的情况下,状态不是纯数据结构而是资源。
scala - Scala fs2:FreeC 和代数做什么?
当我进入 fs2 流操作的实现时,我经常在那里看到fromFreeC
和Algebra
使用。
究竟fromFreeC
和Algebra
做什么?为什么以及何时需要它们?
这是一个示例,一个 fs2 流操作eval
,它评估效果并返回一个有效的流。
scala - FS2 中的块文件轮换
我有一大串字符串。现在我想将这些块写入文件。在每个文件中我想写 N 个块。我写了下面的代码来解决这个问题。但我的方法肯定不是惯用的。
1)所以我unsafeRunSync()
不止一次打电话这是一个缺陷(因为地图被多次调用)。但我不知道如何以另一种方式解决它。我是 FS2 和 Cats-Effects 的新手。
2)另外,我在chunkN(4)
这里打电话。在我的例子中,这看起来是无害的。但在我的实际用例中,我会使用chunkN(10 ^ 6)
. 我是否Chunks
从文档中正确理解 FS2 会在内存中累积 10^6 块,然后将它们发送到下游?
我知道有一个 fileRotate 函数,它可以和 Bytes 一样工作,limit
但我认为它在我的用例中并没有真正的帮助。
谢谢