0

我的 Scala 应用程序启动了一个将文件写入磁盘的外部进程。在一个单独的线程中,我想读取该文件并将其内容复制到一个,OutputStream直到该过程完成并且文件不再增长。

有几个边缘情况需要考虑:

  1. 当线程准备好启动时,该文件可能还不存在。
  2. 线程的复制速度可能比进程写入的速度快。换句话说,它可能会在文件仍在增长时到达文件末尾。

顺便说一句,我可以向线程传递一个processCompletionFuture变量,该变量指示文件何时完成增长。

有没有一种优雅而有效的方法来做到这一点?也许使用 Akka Streams 或演员?(我已经尝试使用 Akka Stream 关​​闭FileInputStream,但是一旦输入流中没有更多字节,流似乎就终止了,这种情况发生在 #2 的情况下)。

4

1 回答 1

2

Alpakka 是一个基于 Akka Streams 构建的库,它有一个FileTailSource模仿tail -fUnix 命令的实用程序。例如:

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.alpakka.file.scaladsl._
import akka.util.{ ByteString, Timeout }
import java.io.OutputStream
import java.nio.file.Path
import scala.concurrent._
import scala.concurrent.duration._

val path: Path = ???

val maxLineSize = 10000

val tailSource: Source[ByteString, NotUsed] = FileTailSource(
  path = path,
  maxChunkSize = maxLineSize,
  startingPosition = 0,
  pollingInterval = 500.millis
).via(Framing.delimiter(ByteString(System.lineSeparator), maxLineSize, true))

上面tailSource逐行读取整个文件,并每 500 毫秒连续读取新附加的数据。要将流内容复制到OutputStream,请将源连接到接收StreamConverters.fromOutputStream器:

val stream: Future[IOResult] =
  tailSource
    .runWith(StreamConverters.fromOutputStream(() => new OutputStream {
      override def write(i: Int): Unit = ???
      override def write(bytes: Array[Byte]): Unit = ???
    }))

(请注意,有一种FileTailSource.lines方法会产生 a ,但在这种情况下,使用它而不是Source[String, NotUsed]更为合适。这就是示例使用,它产生 a 的原因。)ByteStringStringFileTailSource.apply()Source[ByteString, NotUsed]

如果文件在实现时不存在,则流将失败。因此,您需要在运行流之前确认文件的存在。这可能是矫枉过正,但一个想法是使用 Alpakka 的DirectoryChangesSource

于 2018-04-22T14:25:53.303 回答