我正在使用 Scala 的FS2 流库。
我有一个Stream[F, [Stream[F, A]]
内部流和外部流都是无限的(带有适当Async
的实例F
)。我想最终得到一个Stream[F, A]
同时从外部流和内部流中提取的结果,其中外部流中的每个新元素都会替换我从中提取的当前内部流。特别是我想“最终”至少尝试从所有内部流中拉出(尽管我可能会在这样做之前被外部流打断)。
我尝试使用包含当前内部流的外部Async
引用来执行此操作似乎以Interrupt
抛出异常而告终。
我不要一个简单的flatMap
甚至concurrent.join
。因为我的内部流是无限的,所以它们永远不会超过有限数量的内部流。
有没有办法通过 FS2 实现这一点?