我们实际上使用 akka 流来处理二进制文件。由于没有任何关于此的文档,因此让事情顺利进行有点棘手,但这就是我们想出的:
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte)
val binSource = Source(binStream)
一旦你有了binSource
,这是一个 akka Source[Byte]
,你可以继续并开始应用你想要的任何流转换( map
、flatMap
、等......)。transform
这个功能利用了Source
伴随对象apply
,它需要一个Iterable
,传入一个 scala Stream
,它应该懒惰地读取数据并使其可用于您的转换。
编辑
正如 Konrad 在评论部分指出的那样,Stream 可能会成为大文件的问题,因为它会在延迟构建流时对遇到的元素执行记忆化。如果您不小心,这可能会导致内存不足的情况。但是,如果您查看Stream的文档,则有一个提示可以避免在内存中建立记忆:
必须小心记忆;如果您不小心,您可能会很快耗尽大量内存。原因是 Stream 的 memoization 创建了一个很像 scala.collection.immutable.List 的结构。只要有东西抓住头部,头部就会抓住尾巴,所以它会递归地继续。另一方面,如果头部没有任何东西(例如,我们使用 def 来定义 Stream),那么一旦不再直接使用它,它就会消失。
所以考虑到这一点,你可以修改我原来的例子如下:
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binSource = Source(() => binStream(inputStream).iterator)
def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte)
所以这里的想法是构建Stream
via adef
而不是分配给 aval
然后立即从中获取iterator
并使用它来初始化 Akka Source
。以这种方式设置应该可以避免 momoization 的问题。我针对一个大文件运行旧代码,并且能够OutOfMemory
通过foreach
在Source
. 当我将它切换到新代码时,我能够避免这个问题。