我正在尝试将基于 akka 流的流集成到我的 Play 2.5 应用程序中。这个想法是您可以流式传输照片,然后将其作为原始文件、缩略图版本和水印版本写入磁盘。
我设法使用类似这样的图表来完成这项工作:
val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
.map(_.result().toArray)
def toByteArray = Flow[ByteString].map(b => b.toArray)
val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
import GraphDSL.Implicits._
val streamFan = builder.add(Broadcast[ByteString](3))
val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
val output = builder.add(Flow[ByteString].map(x => Success(Done)))
val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))
streamFan.out(0) ~> rawFileSink
streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
streamFan.out(2) ~> output.in
byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink
FlowShape(streamFan.in, output.out)
})
graph
}
然后我使用这样的累加器将它连接到我的播放控制器:
val sink = Sink.head[Try[Done]]
val photoStorageParser = BodyParser { req =>
Accumulator(sink).through(graph).map(Right.apply)
}
问题是我的两个处理过的文件接收器没有完成,两个处理过的文件的大小都为零,但不是原始文件。我的理论是累加器只等待我的扇出的一个输出,所以当输入流完成并且我的 byteAccumulator 吐出完整的文件时,到处理完成时,播放已经从输出中获得了物化值.
所以,我的问题是:
就我的方法而言,我是否走在正确的轨道上?运行这样的图表的预期行为是什么?我怎样才能把我所有的水槽放在一起形成一个最终的水槽?