Alpakka 是一个基于 Akka Streams 构建的库,它有一个FileTailSource
模仿tail -f
Unix 命令的实用程序。例如:
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.alpakka.file.scaladsl._
import akka.util.{ ByteString, Timeout }
import java.io.OutputStream
import java.nio.file.Path
import scala.concurrent._
import scala.concurrent.duration._
val path: Path = ???
val maxLineSize = 10000
val tailSource: Source[ByteString, NotUsed] = FileTailSource(
path = path,
maxChunkSize = maxLineSize,
startingPosition = 0,
pollingInterval = 500.millis
).via(Framing.delimiter(ByteString(System.lineSeparator), maxLineSize, true))
上面tailSource
逐行读取整个文件,并每 500 毫秒连续读取新附加的数据。要将流内容复制到OutputStream
,请将源连接到接收StreamConverters.fromOutputStream
器:
val stream: Future[IOResult] =
tailSource
.runWith(StreamConverters.fromOutputStream(() => new OutputStream {
override def write(i: Int): Unit = ???
override def write(bytes: Array[Byte]): Unit = ???
}))
(请注意,有一种FileTailSource.lines
方法会产生 a ,但在这种情况下,使用它而不是Source[String, NotUsed]
更为合适。这就是示例使用,它产生 a 的原因。)ByteString
String
FileTailSource.apply()
Source[ByteString, NotUsed]
如果文件在实现时不存在,则流将失败。因此,您需要在运行流之前确认文件的存在。这可能是矫枉过正,但一个想法是使用 Alpakka 的DirectoryChangesSource
。