0

我正在使用 ZIO Streams 创建一个 ByteArrayOutputStream,即:

lazy val byteArrayOutputStream = new ByteArrayOutputStream()
val sink = ZSink.fromOutputStream(byteArrayOutputStream).contramapChunks[String](_.flatMap(_.getBytes)
val data = ZStream.unwrap(callToFunction).run(sink)

这工作正常 - 现在我需要使用 akka http 将这些数据流式传输回客户端。我可以做这个:

val arr = byteArrayOutputStream.toByteArray
complete(HttpEntity(ContentTypes.`application/octet-stream`, arr)

这有效,但当然 toByteArray 将输出流带入内存,即我不流式传输数据。我遗漏了一些明显的东西 - 有没有简单的方法可以做到这一点?

4

1 回答 1

0

您可以将输出流转换为 Akka Stream Source

val byteArrayOutputStream = new ByteArrayOutputStream()
val source = StreamConverters.asOutputStream().mapMaterializedValue(_ => byteArrayOutputStream)

然后简单地创建一个分块的 HTTP 实体:

HttpResponse(entity = HttpEntity.Chunked.fromData(ContentTypes.`application/octet-stream`, source))

有关分块传输的更多信息:https ://datatracker.ietf.org/doc/html/rfc7230#section-4.1

对于 ZIO,您可能会使用如下内容:

val zSource = ZStream.fromOutputStreamWriter(os => byteArrayOutputStream.writeTo(os))

但是,您需要找到一种将 ZStream 转换为 Akka Stream Source 的方法。

于 2021-11-29T12:46:46.200 回答