4

假设我想将一些遗留的异步 API 转换为 FS2 Streams。API 提供了一个带有 3 个回调的接口:下一个元素、成功、错误。我希望 Stream 发出所有元素,然后在收到成功或错误回调时完成。

FS2 指南(https://functional-streams-for-scala.github.io/fs2/guide.html)建议fs2.Queue在这种情况下使用,它非常适合入队,但到目前为止我看到的所有示例都期望返回的流queue.dequeue永远不会完成 - 在我的情况下,没有明显的方法来处理成功/错误回调。我尝试使用queue.dequeue.interruptWhen(...here goes the signal...),但如果成功/错误回调在客户端从流中读取数据之前到达,则流会过早终止 - 仍有未读元素。我希望消费者在完成流之前完成阅读它们。

FS2可以做到吗?使用 Akka Streams 很简单 -SourceQueueWithComplete拥有completefail方法。

更新:通过在 Option 中包装元素并将 None 视为停止读取流的信号,以及使用 Promise 传播错误,我能够获得足够好的结果:

queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)

但是,我是否忽略了更自然的做这些事情的方式?

4

2 回答 2

7

一种惯用的方法是创建一个Queue[Option[A]]而不是Queue[A]. 入队时,换行Some,您可以显式入队None以表示完成。在出队方面, do q.dequeue.unNoneTerminate,它会为您提供一个Stream[F, A]在 Queue 发出后终止的None

于 2018-06-19T19:54:27.660 回答
7

对您的更新的回答:结合unNoneTerminatewith rethrow,它接受 a并在遇到 throwable 时Stream[F, Either[Throwable, A]]返回Stream[F, A]错误。Stream.raiseError

然后,您的完整堆栈将是 aStream[F, Either[Throwable, Option[A]]]并且您Stream[F,A]通过调用展开.rethrow.unNoneTerminate

于 2018-08-08T15:52:52.353 回答