问题标签 [fs2]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
485 浏览

scala - 将 Either 作为 fs.Stream 处理

如果我有,在构建主应用程序的流时fs2.StreamApp[IO]如何最好地处理Either(或)?\/

例如,我为程序构建了一些输入作为两者之一(因为我喜欢 API——它告诉我签名中“可能会出错”)。如果有问题,在这种情况下使用命令行参数,我希望从 main 方法返回一个不同的流。

当我进入主要方法时......

......这一切都变得有点难看。

我可以让析取方法 ( extractArgs) 只为错误情况返回一个流,但这似乎很糟糕。特别是因为我也不知道如何快速失败。

有没有更惯用的方法将左转变成错误流(在我的情况下,它会打印一个使用命令然后退出应用程序)?

0 投票
2 回答
1225 浏览

scala - FS2:是否可以优雅地完成队列?

假设我想将一些遗留的异步 API 转换为 FS2 Streams。API 提供了一个带有 3 个回调的接口:下一个元素、成功、错误。我希望 Stream 发出所有元素,然后在收到成功或错误回调时完成。

FS2 指南(https://functional-streams-for-scala.github.io/fs2/guide.html)建议fs2.Queue在这种情况下使用,它非常适合入队,但到目前为止我看到的所有示例都期望返回的流queue.dequeue永远不会完成 - 在我的情况下,没有明显的方法来处理成功/错误回调。我尝试使用queue.dequeue.interruptWhen(...here goes the signal...),但如果成功/错误回调在客户端从流中读取数据之前到达,则流会过早终止 - 仍有未读元素。我希望消费者在完成流之前完成阅读它们。

FS2可以做到吗?使用 Akka Streams 很简单 -SourceQueueWithComplete拥有completefail方法。

更新:通过在 Option 中包装元素并将 None 视为停止读取流的信号,以及使用 Promise 传播错误,我能够获得足够好的结果:

但是,我是否忽略了更自然的做这些事情的方式?

0 投票
1 回答
1209 浏览

scala - FS2 流一直运行到 InputStream 结束

我对 FS2 很陌生,需要一些关于设计的帮助。我正在尝试设计一个流,它将从底层拉出块InputStream直到它结束。这是我尝试过的:

并且程序打印唯一的第一个块。这是合理的。但我想找到一种方法来“指示”在哪里停止阅读以及在哪里停止。我的意思是一直打电话read(is)到结束。有没有办法做到这一点?

我也试过repeatEval(read(is)) ,但它一直在读……我需要介于两者之间的东西。

0 投票
1 回答
301 浏览

scala - 在 Scala 中处理多个并发流的惯用方法

我有一个流列表,在调用它们next()时会随机休眠一段时间,然后从不同的来源读取一个字符。

我正在尝试编写一个消费者,它将继续调用这些流,直到EOF在运行时构建这些流的公共字典。

到目前为止,我使用的ConcurrentHashMap是字典,并且只是为每个流消费者创建了一个新线程。

monix虽然我的解决方案有效,但它似乎很天真,我想知道流媒体库是否有更好的用途,例如fs2

0 投票
0 回答
127 浏览

scala - 没有可用的同步实例时如何运行 FS2 流?

如果我最终得到一个例如Stream[Id, A],我可以用微不足道translate的效果。PureId ~> Pure

但是,如果我最终得到了一个不同的严格效果类型的流,例如Stream[Option, A],我怎样才能将它转换为一个Option[Vector[A]](或其他东西)?到目前为止,我发现的最好方法是将流转换为具有实例的其他类型,运行流,但是将其转换回来并不简单,因为承认所有s 都失败,而自然只允许. 解决这个问题的更好方法是什么?SyncSyncThrowableOptionNone

0 投票
1 回答
768 浏览

scala - 如何使用 FS2 中的分类器功能对对象进行分组?

我有一个 unordered 流measurements,我想将其分组为固定大小的批次,以便以后可以有效地保留它们:

也就是说,而不是:

我希望批量大小等于以下结构3

在 FS2 中是否有一种简单、惯用的方法来实现这一点?我知道有一个groupAdjacentBy函数,但这只会考虑相邻的项目。

我现在在上0.10.5

0 投票
1 回答
682 浏览

scala - FS2 Stream with StateT[IO, _, _],定期转储状态

我有一个消耗无限数据流的程序。在此过程中,我想记录一些指标,这些指标形成一个幺半群,因为它们只是简单的总和和平均值。定期,我想在某处写出这些指标,清除它们,然后返回累积它们。我基本上有:

因此,大多数执行IO直接使用并使用StateT.liftF. 在某些情况下,我会调用一些recordMetric. 最后我有一个流:

我想定期,说每分钟左右,转储指标,所以我尝试了:

然后我执行通常的顶级程序内容,run即使用 start 状态调用,然后调用unsafeRunSync.

问题是,我只看到空指标!我怀疑这与我的 monoid 隐含地提供空指标有关,sendStream但我无法弄清楚为什么应该这样或如何解决它。也许有一种方法可以将这些sendMetrics调用“交织”到主流中?

编辑:这是一个最小的完整可运行示例

现在,如果我这样做:

然后我得到了预期的结果——状态正确地累积到输出中。但如果我这样做:

然后我看到一个空列表一直打印出来。我本来希望打印出部分列表(大约 2 个元素)。

0 投票
2 回答
1456 浏览

scala - FS2 流到未读 InputStream

我想转换fs2.Stream为,java.io.InputStream以便可以将该输入流传递给 http 框架(Finch 和 Akka Http)。

我找到了 a fs2.io.toInputStream,但这不起作用(它什么也不打印):

据我了解,当我运行.unsafeRunSync()它时,它会消耗整个流,所以即使它返回一个Seq[InputStream]底层输入流也已经被消耗了。

有什么方法可以在不消耗的情况下转换fs2.Stream[IO, Byte]为?java.io.InputStream

谢谢!

0 投票
0 回答
245 浏览

scala - 需要有关如何在 fs2 (Scala) 中实现线程安全队列的提示

我需要实现一个微服务,在启动时将大量数据加载到内存中,并通过 HTTP GET 使这些数据可用。我一直将 fs2 视为通过fs2.Queue.

我担心的是,如果我使用synchronous来自 的队列fs2,由于队列的阻塞性质synchronous(在enqueue操作中),我提供数据的性能可能会受到负面影响。

这是一个有效的担忧吗?此外,哪些Queue抽象(在 fs2 中)是线程安全的?即:我可以将任何队列传递给多个线程,并且它们都可以安全地将项目从队列中取出,而不会有多个线程从队列中取出相同的元素吗?

编辑:用例:流服务的 1000 万条记录 -> 许多工作人员(线程)通过 HTTP 端点(GET)从流中挑选工作

0 投票
3 回答
1430 浏览

scala - 如何将大流分组为子流

我想用最多元素组合Stream[F, A]成 内部流。Stream[Stream[F, A]]n

这就是我所做的,基本上是通过管道将块放入Queue[F, Queue[F, Chunk[A]],然后将队列元素作为结果流产生。

但这很复杂,有没有更简单的方法?