问题标签 [fs2]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 将 Either 作为 fs.Stream 处理
如果我有,在构建主应用程序的流时fs2.StreamApp[IO]
如何最好地处理Either
(或)?\/
例如,我为程序构建了一些输入作为两者之一(因为我喜欢 API——它告诉我签名中“可能会出错”)。如果有问题,在这种情况下使用命令行参数,我希望从 main 方法返回一个不同的流。
当我进入主要方法时......
......这一切都变得有点难看。
我可以让析取方法 ( extractArgs
) 只为错误情况返回一个流,但这似乎很糟糕。特别是因为我也不知道如何快速失败。
有没有更惯用的方法将左转变成错误流(在我的情况下,它会打印一个使用命令然后退出应用程序)?
scala - FS2:是否可以优雅地完成队列?
假设我想将一些遗留的异步 API 转换为 FS2 Streams。API 提供了一个带有 3 个回调的接口:下一个元素、成功、错误。我希望 Stream 发出所有元素,然后在收到成功或错误回调时完成。
FS2 指南(https://functional-streams-for-scala.github.io/fs2/guide.html)建议fs2.Queue
在这种情况下使用,它非常适合入队,但到目前为止我看到的所有示例都期望返回的流queue.dequeue
永远不会完成 - 在我的情况下,没有明显的方法来处理成功/错误回调。我尝试使用queue.dequeue.interruptWhen(...here goes the signal...)
,但如果成功/错误回调在客户端从流中读取数据之前到达,则流会过早终止 - 仍有未读元素。我希望消费者在完成流之前完成阅读它们。
FS2可以做到吗?使用 Akka Streams 很简单 -SourceQueueWithComplete
拥有complete
和fail
方法。
更新:通过在 Option 中包装元素并将 None 视为停止读取流的信号,以及使用 Promise 传播错误,我能够获得足够好的结果:
但是,我是否忽略了更自然的做这些事情的方式?
scala - FS2 流一直运行到 InputStream 结束
我对 FS2 很陌生,需要一些关于设计的帮助。我正在尝试设计一个流,它将从底层拉出块InputStream
直到它结束。这是我尝试过的:
并且程序打印唯一的第一个块。这是合理的。但我想找到一种方法来“指示”在哪里停止阅读以及在哪里停止。我的意思是一直打电话read(is)
到结束。有没有办法做到这一点?
我也试过repeatEval(read(is))
,但它一直在读……我需要介于两者之间的东西。
scala - 在 Scala 中处理多个并发流的惯用方法
我有一个流列表,在调用它们next()
时会随机休眠一段时间,然后从不同的来源读取一个字符。
我正在尝试编写一个消费者,它将继续调用这些流,直到EOF
在运行时构建这些流的公共字典。
到目前为止,我使用的ConcurrentHashMap
是字典,并且只是为每个流消费者创建了一个新线程。
monix
虽然我的解决方案有效,但它似乎很天真,我想知道流媒体库是否有更好的用途,例如fs2
scala - 没有可用的同步实例时如何运行 FS2 流?
如果我最终得到一个例如Stream[Id, A]
,我可以用微不足道translate
的效果。Pure
Id ~> Pure
但是,如果我最终得到了一个不同的严格效果类型的流,例如Stream[Option, A]
,我怎样才能将它转换为一个Option[Vector[A]]
(或其他东西)?到目前为止,我发现的最好方法是将流转换为具有实例的其他类型,运行流,但是将其转换回来并不简单,因为承认所有s 都失败,而自然只允许. 解决这个问题的更好方法是什么?Sync
Sync
Throwable
Option
None
scala - 如何使用 FS2 中的分类器功能对对象进行分组?
我有一个 unordered 流measurements
,我想将其分组为固定大小的批次,以便以后可以有效地保留它们:
也就是说,而不是:
我希望批量大小等于以下结构3
:
在 FS2 中是否有一种简单、惯用的方法来实现这一点?我知道有一个groupAdjacentBy函数,但这只会考虑相邻的项目。
我现在在上0.10.5
。
scala - FS2 Stream with StateT[IO, _, _],定期转储状态
我有一个消耗无限数据流的程序。在此过程中,我想记录一些指标,这些指标形成一个幺半群,因为它们只是简单的总和和平均值。定期,我想在某处写出这些指标,清除它们,然后返回累积它们。我基本上有:
因此,大多数执行IO
直接使用并使用StateT.liftF
. 在某些情况下,我会调用一些recordMetric
. 最后我有一个流:
我想定期,说每分钟左右,转储指标,所以我尝试了:
然后我执行通常的顶级程序内容,run
即使用 start 状态调用,然后调用unsafeRunSync
.
问题是,我只看到空指标!我怀疑这与我的 monoid 隐含地提供空指标有关,sendStream
但我无法弄清楚为什么应该这样或如何解决它。也许有一种方法可以将这些sendMetrics
调用“交织”到主流中?
编辑:这是一个最小的完整可运行示例:
现在,如果我这样做:
然后我得到了预期的结果——状态正确地累积到输出中。但如果我这样做:
然后我看到一个空列表一直打印出来。我本来希望打印出部分列表(大约 2 个元素)。
scala - FS2 流到未读 InputStream
我想转换fs2.Stream
为,java.io.InputStream
以便可以将该输入流传递给 http 框架(Finch 和 Akka Http)。
我找到了 a fs2.io.toInputStream
,但这不起作用(它什么也不打印):
据我了解,当我运行.unsafeRunSync()
它时,它会消耗整个流,所以即使它返回一个Seq[InputStream]
底层输入流也已经被消耗了。
有什么方法可以在不消耗的情况下转换fs2.Stream[IO, Byte]
为?java.io.InputStream
谢谢!
scala - 需要有关如何在 fs2 (Scala) 中实现线程安全队列的提示
我需要实现一个微服务,在启动时将大量数据加载到内存中,并通过 HTTP GET 使这些数据可用。我一直将 fs2 视为通过fs2.Queue
.
我担心的是,如果我使用synchronous
来自 的队列fs2
,由于队列的阻塞性质synchronous
(在enqueue
操作中),我提供数据的性能可能会受到负面影响。
这是一个有效的担忧吗?此外,哪些Queue
抽象(在 fs2 中)是线程安全的?即:我可以将任何队列传递给多个线程,并且它们都可以安全地将项目从队列中取出,而不会有多个线程从队列中取出相同的元素吗?
编辑:用例:流服务的 1000 万条记录 -> 许多工作人员(线程)通过 HTTP 端点(GET)从流中挑选工作
scala - 如何将大流分组为子流
我想用最多元素组合Stream[F, A]
成 内部流。Stream[Stream[F, A]]
n
这就是我所做的,基本上是通过管道将块放入Queue[F, Queue[F, Chunk[A]]
,然后将队列元素作为结果流产生。
但这很复杂,有没有更简单的方法?