4

我正在尝试找出使用 scala 中的字符串来处理大量数据的高效内存和功能方法。我读过很多关于惰性集合的东西,也看过很多代码示例。但是,我一次又一次地遇到“超出 GC 开销”或“Java 堆空间”问题。

通常问题是我尝试构建一个惰性集合,但是当我将每个新元素附加到不断增长的集合时评估它(我现在没有任何其他方式来增量这样做)。当然,我可以尝试先初始化一个初始惰性集合,然后通过使用 map 左右应用资源关键计算来生成包含所需值的集合,但通常我只是不知道最终集合的确切大小先验地初始化该惰性集合。

也许您可以通过给我提示或解释如何改进以下代码作为示例来帮助我,该代码根据奇数序列对属于一个文件和偶数序列对属于一个文件的规则将 FASTA(以下定义)格式的文件拆分为两个单独的文件到另一个(“股线分离”)。“最”直接的方法是通过循环遍历行并通过打开的文件流打印到相应的文件中的命令方式(这当然效果很好)。但是,我只是不喜欢重新分配给包含标题和序列的变量的风格,因此下面的示例代码使用(尾)递归,我希望找到一种方法来维护类似的设计而不会遇到资源问题!

该示例非常适用于小文件,但是对于大约 500mb 左右的文件,代码将在标准 JVM 设置中失败。我确实想处理“任意”大小的文件,比如 10-20gb 左右。

val fileName = args(0)
val in = io.Source.fromFile(fileName) getLines

type itType = Iterator[String]
type sType = Stream[(String, String)]

def getFullSeqs(ite: itType) = {
    //val metaChar = ">"
    val HeadPatt = "(^>)(.+)" r
    val SeqPatt  = "([\\w\\W]+)" r
    @annotation.tailrec
    def rec(it: itType, out: sType = Stream[(String, String)]()): sType = 
        if (it hasNext) it next match  {
            case HeadPatt(_,header) =>
                // introduce new header-sequence pair
                rec(it, (header, "") #:: out)
            case SeqPatt(seq) =>
                val oldVal = out head
                // concat subsequences
                val newStream = (oldVal._1, oldVal._2 + seq) #:: out.tail    
                rec(it, newStream)
            case _ =>
                println("something went wrong my friend, oh oh oh!"); Stream[(String, String)]()                
        } else out
    rec(ite)    
}

def printStrands(seqs: sType) {
   import java.io.PrintWriter
   import java.io.File
   def printStrand(seqse: sType, strand: Int) {
        // only use sequences of one strand 
        val indices =  List.tabulate(seqs.size/2)(_*2 + strand - 1).view
        val p = new PrintWriter(new File(fileName + "." + strand))
        indices foreach { i =>
              p.print(">" + seqse(i)._1 + "\n" + seqse(i)._2 + "\n")
        }; p.close
       println("Done bro!")
   }
   List(1,2).par foreach (s => printStrand(seqs, s))
}

printStrands(getFullSeqs(in))

我提出了三个问题:

A)假设需要维护一个大型数据结构,该结构是通过处理您getLines在我的getFullSeqs方法中获得的初始迭代器获得的(注意 的不同大小in和输出getFullSeqs),因为需要重复对整个(!)数据进行转换,因为人们不知道在任何步骤中都需要哪一部分数据。我的例子可能不是最好的,但是怎么做呢?有可能吗?

B)当所需的数据结构不是天生懒惰的时候,比如说想要将这些(header -> sequence)对存储到 a 中Map()怎么办?你会把它包装在一个惰性集合中吗?

C)我构建流的实现可能会颠倒输入行的顺序。调用 reverse 时,将评估所有元素(在我的代码中,它们已经是,所以这是实际问题)。有没有办法以懒惰的方式“从后面”进行后期处理?我知道reverseIterator,但这已经是解决方案了吗,或者这实际上也不会首先评估所有元素(因为我需要在列表中调用它)?可以用 构造流newVal #:: rec(...),但那样我会失去尾递归,不是吗?

所以我基本上需要的是将元素添加到集合中,而不是通过添加过程来评估。所以lazy val elem = "test"; elem :: lazyCollection不是我要找的。

编辑:我也尝试在rec.

非常感谢您的关注和时间,我非常感谢您的帮助(再次:))。

///////////////////////////////////////// ///////////////////////////////////////// ///////////////////////////////////////// ///////////////

FASTA 被定义为由单个标题行分隔的一组顺序序列。标题定义为以“>”开头的行。标题下方的每一行都称为与标题相关的序列的一部分。当出现新标头时,序列结束。每个标题都是唯一的。例子:

>HEADER1
abcdefg
>HEADER2
hijklmn
opqrstu
>HEADER3
vwxyz
>HEADER4
zyxwv

因此,序列 2 是 seq 1 的两倍。我的程序会将该文件拆分为一个文件 A,其中包含

>HEADER1
abcdefg
>HEADER3
vwxyz

和第二个文件 B 包含

>HEADER2
hijklmn
opqrstu
>HEADER4
zyxwv

假设输入文件由偶数个头序列对组成。

4

2 回答 2

4

使用非常大的数据结构的关键是只在内存中保存对执行所需的任何操作至关重要的数据。所以,在你的情况下,那就是

  • 你的输入文件
  • 你的两个输出文件
  • 当前行文本

就是这样。在某些情况下,您可能需要存储信息,例如序列的长度;在这种情况下,您在第一遍中构建数据结构并在第二遍中使用它们。例如,假设您决定要编写三个文件:一个用于偶数记录,一个用于奇数记录,一个用于总长度小于 300 个核苷酸的条目。你会做这样的事情(警告 - 它编译但我从未运行过它,所以它可能实际上不起作用):

final def findSizes(
  data: Iterator[String], sz: Map[String,Long] = Map(),
  currentName: String = "", currentSize: Long = 0
): Map[String,Long] = {
  def currentMap = if (currentName != "") sz + (currentName->currentSize) else sz
  if (!data.hasNext) currentMap
  else {
    val s = data.next
    if (s(0) == '>') findSizes(data, currentMap, s, 0)
    else findSizes(data, sz, currentName, currentSize + s.length)
  }
}

然后,为了处理,您使用该地图并再次通过:

import java.io._
final def writeFiles(
  source: Iterator[String], targets: Array[PrintWriter],
  sizes: Map[String,Long], count: Int = -1, which: Int = 0
) {
  if (!source.hasNext) targets.foreach(_.close)
  else {
    val s = source.next
    if (s(0) == '>') {
      val w = if (sizes.get(s).exists(_ < 300)) 2 else (count+1)%2
      targets(w).println(s)
      writeFiles(source, targets, sizes, count+1, w)
    }
    else {
      targets(which).println(s)
      writeFiles(source, targets, sizes, count, which)
    }
  }
}

然后,您使用Source.fromFile(f).getLines()两次来创建迭代器,一切就绪。 编辑:从某种意义上说,这是关键步骤,因为这是您的“懒惰”收藏。但是,这并不重要,因为它不会立即读取所有内存(“懒惰”),而是因为它也不存储任何先前的字符串!

更一般地说,Scala 无法帮助您仔细考虑您需要在内存中拥有哪些信息以及您可以根据需要从磁盘中获取哪些信息。惰性求值有时会有所帮助,但没有神奇的公式,因为您可以轻松地表达将所有数据以惰性方式存储在内存中的要求。Scala不能将您访问内存的命令秘密地解释为从磁盘中获取内容的指令。(好吧,除非您编写一个库来缓存磁盘中的结果,否则不会这样做。)

于 2012-06-05T12:00:09.620 回答
4

可以使用 newVal #:: rec(...) 构造流,但那样我会失去尾递归,不是吗?

实际上,没有。

所以,事情就是这样……用你现在的尾递归,你用值填充所有的。Stream是的,Stream是懒惰的,但是你正在计算所有的元素,剥去它的任何懒惰。

现在说你做newVal #:: rec(...)。你会失去尾递归吗?没有为什么?因为你没有递归。怎么会?嗯,Stream是懒惰的,所以它不会评估rec(...)

这就是它的美妙之处。一旦你这样做,在第一次交互时返回,并且只在请求getFullSeqs时计算“递归” 。printStrands不幸的是,这不会按原样工作......

问题是你一直在修改Stream——这不是你使用Stream. 使用Stream,您总是附加到它。不要一直“重写” Stream.

现在,我可以轻松识别出其他三个问题printStrands。首先,它调用sizeseqs这将导致整体Stream被处理,失去惰性。永远不要调用size. Stream其次,您调用applyseqse通过索引访问它。永远不要调用apply( Streamor List) - 这是非常低效的。它是O(n),这使您的内部循环O(n^2)- 是的,是输入文件中标题数量的二次方!最后,在整个执行过程中printStrands保持对 的引用,防止处理元素被垃圾回收。seqsprintStrand

所以,这是第一个近似值:

def inputStreams(fileName: String): (Stream[String], Stream[String]) = {
  val in = (io.Source fromFile fileName).getLines.toStream
  val SeqPatt = "^[^>]".r
  def demultiplex(s: Stream[String], skip: Boolean): Stream[String] = {
    if (s.isEmpty) Stream.empty
    else if (skip) demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = false)
         else s.head #:: (s.tail takeWhile (SeqPatt findFirstIn _ nonEmpty)) #::: demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = true)
  }
  (demultiplex(in, skip = false), demultiplex(in, skip = true))
}

上面的问题,我展示代码只是为了进一步指导懒惰问题,是你这样做的那一刻:

val (a, b) = inputStreams(fileName)

您将保留对两个流的头部的引用,以防止垃圾收集它们。您无法保留对它们的引用,因此您必须在获得它们后立即使用它们,而无需将它们存储在“val”或“lazy val”中。“var”可能会,但处理起来会很棘手。所以让我们试试这个:

def inputStreams(fileName: String): Vector[Stream[String]] = {
  val in = (io.Source fromFile fileName).getLines.toStream
  val SeqPatt = "^[^>]".r
  def demultiplex(s: Stream[String], skip: Boolean): Stream[String] = {
    if (s.isEmpty) Stream.empty
    else if (skip) demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = false)
         else s.head #:: (s.tail takeWhile (SeqPatt findFirstIn _ nonEmpty)) #::: demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = true)
  }
  Vector(demultiplex(in, skip = false), demultiplex(in, skip = true))
}

inputStreams(fileName).zipWithIndex.par.foreach { 
  case (stream, strand) => 
    val p = new PrintWriter(new File("FASTA" + "." + strand))
    stream foreach p.println
    p.close
}

这仍然行不通,因为streaminsideinputStreams用作参考,即使在打印时也将整个流保留在内存中。

那么,再次失败,我有什么建议?把事情简单化。

def in = (scala.io.Source fromFile fileName).getLines.toStream
def inputStream(in: Stream[String], strand: Int = 1): Stream[(String, Int)] = {
  if (in.isEmpty) Stream.empty
  else if (in.head startsWith ">") (in.head, 1 - strand) #:: inputStream(in.tail, 1 - strand)
       else                        (in.head, strand) #:: inputStream(in.tail, strand)
}
val printers = Array.tabulate(2)(i => new PrintWriter(new File("FASTA" + "." + i)))
inputStream(in) foreach {
  case (line, strand) => printers(strand) println line
}
printers foreach (_.close)

现在,这将不再保留在内存中而不是必要的。但是,我仍然认为它太复杂了。这可以像这样更容易地完成:

def in = (scala.io.Source fromFile fileName).getLines
val printers = Array.tabulate(2)(i => new PrintWriter(new File("FASTA" + "." + i)))
def printStrands(in: Iterator[String], strand: Int = 1) {
  if (in.hasNext) {
    val next = in.next
    if (next startsWith ">") { 
      printers(1 - strand).println(next)
      printStrands(in, 1 - strand)
    } else {
      printers(strand).println(next)
      printStrands(in, strand)
    }
  }
}
printStrands(in)
printers foreach (_.close)

或者只是使用while循环而不是递归。

现在,对于其他问题:

B)在阅读时这样做可能很有意义,这样您就不必保留数据的两个副本: theMap和 a Seq

C) 不要反转Streama——你会失去它所有的懒惰。

于 2012-06-05T17:12:18.750 回答