1

我想为目录和子目录中的每个文件应用一个函数,如下所示:

  def applyRecursively(dir: String, fn: (File) => Any) {
    def listAndProcess(dir: File) {
      dir.listFiles match {
        case null => out.println("exception: dir cannot be listed: " + dir.getPath); List[File]()
        case files => files.toList.sortBy(_.getName).foreach(file => {
          fn(file)
          if (!java.nio.file.Files.isSymbolicLink(file.toPath) && file.isDirectory) listAndProcess(file)
        })
      }
    }
    listAndProcess(new File(dir))
  }

  def exampleFn(file: File) { println(s"processing $file") } 

  applyRecursively(dir, exampleFn)

这行得通。这里的问题是如何使用 scala Iteratees 重构此代码。像这样的东西:

val en = Enumerator.generateM(...) // ???
val it: Iteratee[File, Unit] = Iteratee.foreach(exampleFn)
val res = en.run(it)
res.onSuccess { case x => println("DONE") }
4

4 回答 4

3

你可以用Enumerator.unfold这个。签名是:

def unfold[S, E](s: S)(f: (S) => Option[(S, E)])(implicit ec: ExecutionContext): Enumerator[E]

这个想法是你从一个类型的值开始S,然后对它应用一个返回一个Option[(S, E)]. 值None表示Enumerator已达到 EOF。ASome包含S要展开的另一个,以及Enumerator[E]将生成的下一个值。在您的示例中,您可以从Array[File](初始目录)开始,从 中获取第一个值Array,然后检查它是文件还是目录。如果它只是一个文件,则返回Array带有元组的尾部File。如果File是一个目录,您将获得文件列表并将其添加到Array. 然后接下来的步骤unfold将继续处理包含的文件。

你最终会得到这样的结果:

def list(dir: File)(implicit ec: ExecutionContext): Enumerator[File] = {
  Enumerator.unfold(Array(dir)) { listing =>
    listing.headOption.map { file =>
      if(!java.nio.file.Files.isSymbolicLink(file.toPath) && file.isDirectory)
        (file.listFiles.sortBy(f => (f.isDirectory, f.getName)) ++ listing.tail) -> file
      else
        listing.tail -> file
    }
  }
}

我添加了一个额外的排序依据isDirectory,首先优先考虑非目录。这意味着如果将目录内容添加到Array展开,则在添加更多内容之前将首先使用文件。由于递归性质,这将防止内存占用迅速扩大。

如果您希望从 final 中删除目录Enumerator,您可以使用它Enumeratee.filter来执行此操作。你最终会得到类似的东西:

list(dir) &> Enumeratee.filter(!_.isDirectory) |>> Iteratee.foreach(fn)
于 2016-03-29T16:44:09.803 回答
2

它不能满足您的所有要求,但这可以帮助您入门

object ExampleEnumerator {
  import scala.concurrent.ExecutionContext.Implicits.global

  def exampleFn(file: File) { println(s"processing $file") }

  def listFiles(dir: File): Enumerator[File] = {
    val files = Option(dir.listFiles).toList.flatten.sortBy(_.getName)

    Enumerator(dir) andThen Enumerator(files :_*).flatMap(listFiles)
  }

  def main(args: Array[String]) {
    import scala.concurrent.duration._

    val dir = "."
    val en: Enumerator[File] = listFiles(new File(dir))
    val it: Iteratee[File, Unit] = Iteratee.foreach(exampleFn)
    val res = en.run(it)
    res.onSuccess { case x => println("DONE") }

    Await.result(res, 10.seconds)
  }
}
于 2016-03-29T10:21:00.253 回答
2

这只是补充了m-w一些日志记录以帮助理解它的好答案。

$ cd /david/test
$ find .
.
./file1
./file2
./file3d
./file3d/file1
./file3d/file2
./file4

爪哇:

import play.api.libs.iteratee._
import java.io.File
import scala.concurrent.Await
import scala.concurrent.duration.Duration

object ExampleEnumerator3 {
  import scala.concurrent.ExecutionContext.Implicits.global

  def exampleFn(file: File) { println(s"processing $file") }

  def list(dir: File): Enumerator[File] = {
    println(s"list $dir")
    val initialInput: List[File] = List(dir)
    Enumerator.unfold(initialInput) { (input: List[File]) =>
      val next: Option[(List[File], File)] = input.headOption.map { file =>
        if(file.isDirectory) {
          (file.listFiles.toList.sortBy(_.getName) ++ input.tail) -> file
        } else {
          input.tail -> file
        }
      }
      next match {
        case Some(dn) => print(s"value to unfold: $input\n  next value to unfold: ${dn._1}\n  next input: ${dn._2}\n")
        case None => print(s"value to unfold: $input\n  finished unfold\n")
      }
      next
    }
  }

  def main(args: Array[String]) {
    val dir = new File("/david/test")
    val res = list(dir).run(Iteratee.foreach(exampleFn))
    Await.result(res, Duration.Inf)
  }
}

日志:

list /david/test
value to unfold: List(/david/test)
  next value to unfold: List(/david/test/file1, /david/test/file2, /david/test/file3d, /david/test/file4)
  next input: /david/test
processing /david/test
value to unfold: List(/david/test/file1, /david/test/file2, /david/test/file3d, /david/test/file4)
  next value to unfold: List(/david/test/file2, /david/test/file3d, /david/test/file4)
  next input: /david/test/file1
processing /david/test/file1
value to unfold: List(/david/test/file2, /david/test/file3d, /david/test/file4)
  next value to unfold: List(/david/test/file3d, /david/test/file4)
  next input: /david/test/file2
processing /david/test/file2
value to unfold: List(/david/test/file3d, /david/test/file4)
  next value to unfold: List(/david/test/file3d/file1, /david/test/file3d/file2, /david/test/file4)
  next input: /david/test/file3d
processing /david/test/file3d
value to unfold: List(/david/test/file3d/file1, /david/test/file3d/file2, /david/test/file4)
  next value to unfold: List(/david/test/file3d/file2, /david/test/file4)
  next input: /david/test/file3d/file1
processing /david/test/file3d/file1
value to unfold: List(/david/test/file3d/file2, /david/test/file4)
  next value to unfold: List(/david/test/file4)
  next input: /david/test/file3d/file2
processing /david/test/file3d/file2
value to unfold: List(/david/test/file4)
  next value to unfold: List()
  next input: /david/test/file4
processing /david/test/file4
value to unfold: List()
  finished unfold
于 2016-03-29T23:02:02.097 回答
2

这只是通过一些日志记录来补充@JonasAnso 的出色答案以帮助理解它。

$ cd /david/test
$ find .
.
./file1
./file2
./file3d
./file3d/file1
./file3d/file2
./file4

爪哇:

import play.api.libs.iteratee._
import java.io.File
import scala.concurrent.Await
import scala.concurrent.duration.Duration

object ExampleEnumerator2b {
  import scala.concurrent.ExecutionContext.Implicits.global

  def exampleFn(file: File) { println(s"processing $file") }

  def listFiles(dir: File): Enumerator[File] = {
    println(s"listFiles. START: $dir")

    if (dir.isDirectory) {
      val files = dir.listFiles.toList.sortBy(_.getName)
      Enumerator(dir) andThen Enumerator(files :_*).flatMap(listFiles)
    } else {
      Enumerator(dir)
    }
  }

  def main(args: Array[String]) {
    val dir = new File("/david/test2")
    val res = listFiles(dir).run(Iteratee.foreach(exampleFn))
    Await.result(res, Duration.Inf)
  }
}

日志:

listFiles. START: /david/test
processing /david/test
listFiles. START: /david/test/file1
processing /david/test/file1
listFiles. START: /david/test/file2
processing /david/test/file2
listFiles. START: /david/test/file3d
processing /david/test/file3d
listFiles. START: /david/test/file3d/file1
processing /david/test/file3d/file1
listFiles. START: /david/test/file3d/file2
processing /david/test/file3d/file2
listFiles. START: /david/test/file4
processing /david/test/file4
于 2016-03-29T23:13:17.797 回答