9

是否有一些使用org.reactivestreams库使用 Java NIO 处理大型数据流的代码示例(用于高性能)?我的目标是分布式处理,所以使用 Akka 的例子是最好的,但我可以弄清楚。

似乎大多数(我希望不是全部)在 scala 中读取文件的示例都求助于Source(非二进制)或直接 Java NIO(甚至是类似的东西Files.readAllBytes!)

也许我错过了一个激活器模板?(Akka Streams with Scala!可以解决我需要的一切,除了二进制/NIO 端)

4

2 回答 2

9

不要scala.collection.immutable.Stream用来消费这样的文件,原因是它执行记忆 - 也就是说,虽然它是懒惰的,但它会将整个流缓冲(记忆)在内存中!

当您考虑“流处理文件”时,这绝对不是您想要的。Scala 的 Stream 这样工作的原因是因为在功能设置中它完全有意义 - 例如,由于这一点,您可以避免一次又一次地轻松计算斐波那契数,有关更多详细信息,请参阅ScalaDoc

Akka Streams 提供了 Reactive Streams 实现并提供了一个FileIO可以在此处使用的类(它会适当地反压并仅在需要时从文件中提取数据,并且流的其余部分已准备好使用它):

import java.io._
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Sink, Source }

object ExampleApp extends App {


  implicit val sys = ActorSystem()
  implicit val mat = FlowMaterializer()

  FileIO.fromPath(Paths.get("/example/file.txt"))
    .map(c ⇒ { print(c); c })
    .runWith(Sink.onComplete(_ ⇒ { f.close(); sys.shutdown() } ))
}

这里有更多关于使用 Akka Streams 处理 IO 的文档 请注意,这是针对 Akka 的当前编写版本,所以是 2.5.x 系列。

希望这可以帮助!

于 2015-01-07T12:10:42.253 回答
4

我们实际上使用 akka 流来处理二进制文件。由于没有任何关于此的文档,因此让事情顺利进行有点棘手,但这就是我们想出的:

val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte) 
val binSource = Source(binStream)

一旦你有了binSource,这是一个 akka Source[Byte],你可以继续并开始应用你想要的任何流转换( mapflatMap、等......)。transform这个功能利用了Source伴随对象apply,它需要一个Iterable,传入一个 scala Stream,它应该懒惰地读取数据并使其可用于您的转换。

编辑

正如 Konrad 在评论部分指出的那样,Stream 可能会成为大文件的问题,因为它会在延迟构建流时对遇到的元素执行记忆化。如果您不小心,这可能会导致内存不足的情况。但是,如果您查看Stream的文档,则有一个提示可以避免在内存中建立记忆:

必须小心记忆;如果您不小心,您可能会很快耗尽大量内存。原因是 Stream 的 memoization 创建了一个很像 scala.collection.immutable.List 的结构。只要有东西抓住头部,头部就会抓住尾巴,所以它会递归地继续。另一方面,如果头部没有任何东西(例如,我们使用 def 来定义 Stream),那么一旦不再直接使用它,它就会消失。

所以考虑到这一点,你可以修改我原来的例子如下:

val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))     
val binSource = Source(() => binStream(inputStream).iterator)

def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte)

所以这里的想法是构建Streamvia adef而不是分配给 aval然后立即从中获取iterator并使用它来初始化 Akka Source。以这种方式设置应该可以避免 momoization 的问题。我针对一个大文件运行旧代码,并且能够OutOfMemory通过foreachSource. 当我将它切换到新代码时,我能够避免这个问题。

于 2015-01-05T13:25:15.983 回答