问题标签 [fs2]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
739 浏览

scala - 如何将“writeOutputStream”与 fs2 Stream[IO, Byte] 一起使用

我正在尝试使用fs2.io.writeOutputStream输出到 Java AWS lambda fn。我不知道如何提供它正在寻找的隐式参数:

“没有找到参数 cs 的隐含:ContextShift[IO]”

我找到了一些用于创建我自己的隐式 ContextShift 对象的文档,但这对于我正在尝试做的事情来说似乎有点过分了。

0 投票
1 回答
144 浏览

scala - fs2.Stream observeAsync 不会异步执行给定的接收器

我正在尝试fs2.Stream并发功能,但对它的工作原理有一些误解。我想通过某个接收器并行发送流内容。这是我尝试过的:

//1打印以下内容:

从输出可以看出,每个元素都是通过sink顺序发送的。

但是,如果我按如下方式修改接收器:

输出是:

现在我们有 4 个元素一次通过接收器并行发送(尽管设置3为 的限制observerAsync)。

即使我替换observerAsync为just,observe我也得到了相同的并行化效果。

你能澄清一下水槽的实际工作原理吗?

0 投票
1 回答
173 浏览

scala - Intellij:IO[Long] 类型的表达式不符合预期的 FS2_[O2_] 类型

我尝试了这个博客的代码:a-streaming-library-with-a-superpower-fs2-and-functional-programming

Intellij 中,这段代码:

给了我这个例外:

运行代码sbt没有问题。

有没有办法在Intellij中摆脱这个异常?

0 投票
3 回答
1186 浏览

scala - 将元素从外部推送到 fs2 中的反应流

我有一个外部(即我无法更改)Java API,如下所示:

我需要实现一个Sender接受每个事件,将其转换为 JSON 对象,将其中一些事件收集到一个包中,并通过 HTTP 发送到某个端点。这一切都应该异步完成,不send()阻塞调用线程,使用一些固定大小的缓冲区,如果缓冲区已满,则丢弃新事件。

使用 akka-streams 这很简单:我创建一个阶段图(使用 akka-http 发送 HTTP 请求),将其物化并使用物化ActorRef将新事件推送到流中:

CustomBuffer是一个自定义GraphStage,它与提供的库非常相似,Buffer但根据我们的特定需求量身定制;对于这个特定的问题,这可能无关紧要。

如您所见,与非流代码的流交互非常简单——特征!上的方法ActorRef是异步的,不需要调用任何额外的机制。然后通过整个反应管道处理发送给参与者的每个事件。此外,由于 akka-http 的实现方式,我什至可以免费获得连接池,因此与服务器打开的连接不超过一个。

但是,我找不到正确使用 FS2 执行相同操作的方法。即使放弃缓冲的问题(我可能需要编写一个自定义Pipe实现来完成我们需要的其他事情)和 HTTP 连接池,我仍然坚持更基本的事情——即如何将数据推送到“来自外部”的反应流。

我能找到的所有教程和文档都假设整个程序发生在某个效果上下文中,通常是IO. 这不是我的情况 - send()Java 库在未指定的时间调用该方法。因此,我不能将所有内容都保存在一个IO操作中,我必须在send()方法中完成“推送”操作,并将反应流作为一个单独的实体,因为我想聚合事件并希望池 HTTP 连接(我相信自然地与反应流相关联)。

我假设我需要一些额外的数据结构,比如Queue. fs2 确实有某种fs2.concurrent.Queue,但同样,所有文档都显示了如何在单个IO上下文中使用它,所以我假设做类似的事情

然后queue在流定义中使用,然后在send()方法中单独使用,并进一步unsafeRun调用:

不是正确的方法,很可能甚至行不通。

所以,我的问题是,如何正确使用 fs2 来解决我的问题?

0 投票
1 回答
749 浏览

scala - 如何在 Fs2 中将块流转换为常规流

例如:

chunked = Stream(1, 2, 3).chunks val regular = ???

谢谢你。

0 投票
3 回答
600 浏览

scala - 如何通过 key 对 Fs2 Stream 进行分区以分别转换每个分区?

我想要实现的目标,例如,给定数据:

和转型:

得到:


我已经尝试使用 对它进行分区groupAdjacentBy,但它变得太复杂了,因为我需要使用 Key 保留每个 Chunk 之间的复杂状态。我想知道是否有类似 Flink DataStream 的东西。关键?或者更简单的实现方式?

0 投票
1 回答
600 浏览

scala - 新版本中 fs2 (0.10.x) 中的 fs2.Scheduler 是什么

我正在将 fs2 0.10.x 版本迁移到 1.0.0 版本。

我们的代码fs2.Scheduler从 fs2 版本 0.10.x 开始使用。

我不知道较新的 fs2 版本 1.0.0 中的等价物是什么。

我在这里浏览了迁移指南,但找不到从 0.10.x 到 1.0.0 或类似内容的迁移指南。

提前致谢。

0 投票
1 回答
514 浏览

scala - fs2 流:测试 fs2 服务器

我有一个这种形式的流:

我知道为了运行它,我应该执行以下操作:

现在,我想为此服务器编写单元测试,但我只能在服务器全部设置好后运行我的测试。目前,在我的beforeAll街区,我有:

在我的afterAll街区,我有:

在我的测试后关闭服务器。

我想知道是否可以使用信号来实现我的既定目标,以便我可以以某种方式“等待”信号,以便仅在信号出现时运行测试。或者,我很想知道这样做的通用/惯用方式。

0 投票
0 回答
330 浏览

scala - fs2-kafka 消费者获取类型不匹配错误发现 fs2.stream 需要cats.effect.IO

我一直在关注fs2-kafka的示例。但是,我非常坚持以消费者为例。我得到的问题是 fs2.stream 和cats.effect.IO 之间的类型不匹配(下面的错误)

代码:注意:现在根据@AlexeyNovakov 的建议进行了更新,以提供一个工作示例

我似乎在编译时遇到的错误是:

谁能提供任何见解来帮助我理解和修复这个神秘的错误?我尝试了各种尝试来解决这个问题,但无济于事。任何帮助将不胜感激,因为我似乎无法在谷歌上搜索类似的情况。

0 投票
0 回答
652 浏览

fs2 - fs2 流到 zip 压缩的 fs2stream

我有一个 fs2 流,我想创建一个压缩流,准备好写入带有*.zip扩展名的文件或下载。

问题是流永远不会终止。这是代码:

该代码是从https://github.com/eikek/sharry/blob/master/modules/common/src/main/scala/sharry/common/zip.scala无耻地复制 并更新以使用最新的fs2cats-effect

我将问题缩小到enqueueChunkSync

哪个块在最后一个块上。当我放在println那里并使缓冲区变小时,我看到块已成功刷新,直到最后一个。
当我删除阻塞位done.get.fold(throw _, identity)时,它似乎可以工作,但是我想这些字节会一次全部刷新到流中?
最后一个块与以前的块有什么不同?