2

我正在使用Alpakka-FTP,但也许我正在寻找一般的 akka-stream 模式。FTP 连接器可以列出文件或检索它们:

def ls(host: String): Source[FtpFile, NotUsed]
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]]

理想情况下,我想创建一个这样的流:

LIST
  .FETCH_ITEM
  .FOREACH(do something)

但是我无法使用上面编写的两个函数创建这样的流。我觉得我应该能够使用 a 到达那里Flow,例如

Ftp.ls
  .via(some flow that uses the Ftp.fromPath above)
  .runWith(Sink.foreach(do something))

ls仅考虑上面的和fromPath功能,这可能吗?

编辑:

我可以使用一个演员和 来解决这个问题mapAsync,但我仍然觉得它应该更直接。

class Downloader extends Actor {
  override def receive = {
    case ftpFile: FtpFile =>
      Ftp.fromPath(Paths.get(ftpFile.path), settings)
        .toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right)
        .run() pipeTo sender
  }
}

val downloader = as.actorOf(Props(new Downloader))

Ftp.ls("test_path", settings)
  .mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult])
  .runWith(Sink.foreach(res => println("got it!" + res)))
4

1 回答 1

1

您应该能够flatMapConcat用于此目的。您的具体示例可以重写为

Ftp.ls("test_path", settings).flatMapConcat{ ftpFile =>
  Ftp.fromPath(Paths.get(ftpFile.path), settings)
}.runWith(FileIO.toPath(Paths.get("testHDF.txt")))

文档在这里

于 2017-01-22T19:43:02.370 回答