我正在尝试实现一个控制相机的应用程序。相机命令表示为 CameraAction 对象流:
sealed trait CameraMessage
case object Record(recordId: String) extends CameraMessage
case object Stop extends CameraMessage
...
val s = Stream[F, CameraMessage]
假设我有一个测试流,它发出“记录”并在 20 秒后发出“停止”,再过 20 秒后发出另一个“记录”消息,依此类推,输入流是无限的。
然后应用程序使用“记录”它应该创建一个 GStreamer 管道实例(即它是一种效果)并“运行”它,在“停止”时它“停止”管道并关闭它。然后在随后的“记录”中,使用新的 GStreamer 管道重复该模式。
问题是我需要在流事件的句柄之间传递一个不纯的可变对象的实例。
FS2 文档建议使用块来使流有状态,所以我尝试了
def record(gStreamerPipeline: String, fileName: String)
(implicit sync: Sync[F]): F[Pipeline] =
{
... create and open pipeline ...
}
def stopRecording(pipe: Pipeline)(implicit sync: Sync[F]): F[Unit] = {
... stop pipeline, release resources ...
}
def effectPipe(pipelineDef: String)(implicit L: Logger[F]):
Pipe[F, CameraMessage, F[Unit]] = {
type CameraSessionHandle = Pipeline
type CameraStream = Stream[F, CameraSessionHandle]
s: Stream[F, CameraMessage] =>
s.scanChunks(Stream[F, CameraSessionHandle]()) {
case (s: CameraStream, c: Chunk[CameraMessage]) =>
c.last match {
case Some(Record(fileName)) =>
(Stream.bracket(record(pipelineDef, fileName))(p => stopRecording(p)), Chunk.empty)
case Some(StopRecording) =>
(Stream.empty, Chunk(s.compile.drain))
case _ =>
(s, Chunk.empty)
}
}
}
这段代码的问题是,实际录制不会发生在“录制”事件上,而是评估整个块的效果,即当“停止录制”消息到达时,相机会打开,然后立即再次关闭。
如何在不分块的情况下传递“状态”?或者有没有其他方法可以达到我需要的结果?
这可能类似于 带有 StateT[IO, _, _] 的 FS2 Stream,定期转储状态 ,但不同之处在于,在我的情况下,状态不是纯数据结构而是资源。