2

我想转换fs2.Stream为,java.io.InputStream以便可以将该输入流传递给 http 框架(Finch 和 Akka Http)。

我找到了 a fs2.io.toInputStream,但这不起作用(它什么也不打印):

import java.io.{ByteArrayInputStream, InputStream}

import cats.effect.IO
import scala.concurrent.ExecutionContext.Implicits.global

object IOTest {

  def main(args: Array[String]): Unit = {
    val is: InputStream = new ByteArrayInputStream("test".getBytes)
    val stream: fs2.Stream[IO, Byte] = fs2.io.readInputStream(IO(is), 128)

    val test: Seq[InputStream] = stream.through(fs2.io.toInputStream).compile.toList.unsafeRunSync()

    println(scala.io.Source.fromInputStream(test.head).mkString)
  }
}

据我了解,当我运行.unsafeRunSync()它时,它会消耗整个流,所以即使它返回一个Seq[InputStream]底层输入流也已经被消耗了。

有什么方法可以在不消耗的情况下转换fs2.Stream[IO, Byte]为?java.io.InputStream

谢谢!

4

2 回答 2

1

问题是它compile被过早地调用了。我确信在引擎盖下fs2.io.toInputStream做了正确的事情并将创建的InputStream. 这意味着InputStream必须在自身内部Stream访问(例如,在map/flatMap调用中):

val wire: fs2.Stream[IO, Byte] = ???

val result: fs2.Stream[IO, String] = for {
  is <- wire.through(fs2.io.toInputStream)
  str = scala.io.Source.fromInputStream(is).mkString //<--- use the InputStream here
} yield str

println( result.compile.lastOrError.unsafeRunSync() ) //<--- compile at the _very_ end

输出:

测试

于 2019-02-02T05:34:13.433 回答
-1

看起来 Finch 支持 fs2 https://github.com/finagle/finch/tree/master/fs2而 Akka 也有它的流实现,并且有 fs2 - Akka Stream 互操作库,如https://github.com/krasserm /streamz/tree/master/streamz 转换器

因此,我建议您查看实现,因为它们负责资源生命周期。可能您不需要整个库,但它可以作为指导。

如果你从 fs2 的“安全区”开始,为什么要离开那里:)

于 2020-04-17T16:53:12.070 回答