3

如何加快以下scalaz-stream代码的速度?目前处理 70MB 的文本大约需要 5 分钟,所以我可能做错了,因为普通的 scala 等价物需要几秒钟。

(跟进另一个问题

  val converter2: Task[Unit] = {
    val docSep = "~~~"
    io.linesR("myInput.txt")
      .flatMap(line => { val words = line.split(" ");
          if (words.length==0 || words(0)!=docSep) Process(line)
          else Process(docSep, words.tail.mkString(" ")) })
      .split(_ == docSep)
      .filter(_ != Vector())
      .map(lines => lines.head + ": " + lines.tail.mkString(" "))
      .intersperse("\n")
      .pipe(text.utf8Encode)
      .to(io.fileChunkW("correctButSlowOutput.txt"))
      .run
  }
4

2 回答 2

0

我认为您可以只使用其中一种 process1 块方法来块。如果您希望在将行合并为输出格式时进行大量并行处理,请确定有序输出是否重要并使用与合并或 tee 组合的通道。这也将使其可重复使用。因为您正在执行非常少量的处理,所以您可能会被开销所淹没,因此您必须更加努力地使您的工作单元足够大而不会被淹没。

于 2015-05-14T23:38:33.680 回答
0

以下内容基于@user1763729 的分块建议。虽然感觉很笨拙,而且和原始版本一样慢。

  val converter: Task[Unit] = {
    val docSep = "~~~"
    io.linesR("myInput.txt")
      .intersperse("\n") // handle empty documents (chunkBy has to switch from true to false)
      .zipWithPrevious // chunkBy cuts only *after* the predicate turns false
      .chunkBy{ 
        case (Some(prev), line) => { val words = line.split(" "); words.length == 0 || words(0) != docSep } 
        case (None, line) => true }
      .map(_.map(_._1.getOrElse(""))) // get previous element
      .map(_.filter(!Set("", "\n").contains(_)))
      .map(lines => lines.head.split(" ").tail.mkString(" ") + ": " + lines.tail.mkString(" "))
      .intersperse("\n")
      .pipe(text.utf8Encode)
      .to(io.fileChunkW("stillSlowOutput.txt"))
      .run
  }

编辑:

实际上,执行以下操作(仅读取文件,不写入或处理)已经需要 1.5 分钟,所以我想加快速度的希望并不大。

  val converter: Task[Unit] = {
    io.linesR("myInput.txt")
      .pipe(text.utf8Encode)
      .run
  }
于 2015-06-28T08:35:28.757 回答