2

我正在 Spark 中实现一个 LDA 模型(通过 Scala API),并使用不同数量的主题测试模型。一般来说,它似乎工作正常,但遇到间歇性任务失败,我很确定这与内存问题有关。我当前代码的相关部分如下。

请注意,我正在从 RDD 的文本转储中加载我的数据,其中每个文档都是一个稀疏的 mllib 向量。因此,我的文件中的示例行LDA_vectors如下所示:

(7066346,(112312,[1,3,5,7,...],[2.0,57.0,10.0,2.0,...]))

这是标准的 mllib 稀疏格式,可以读作

(document_id,(vocabulary_size,[term_id_1, term_id_2...], [term_1_freq,term_2, freq,...]))

因此该parse函数处理将其读入 RDD:

import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel, LocalLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import java.io._

def parse(rdd: RDD[String]): RDD[(Long, Vector)] = {
  val pattern: scala.util.matching.Regex = "\\(([0-9]+),(.*)\\)".r
  rdd .map{
  case pattern(k, v) => (k.toLong, Vectors.parse(v))
  }
 }

val text = sc.textFile("/path/to/LDA_vectors")
val docsWithFeatures = parse(text).repartition(192).cache()

然后我在不同数量的主题上运行一个循环。请注意,用于将 word-document 矩阵保存到文件的代码块遵循此处描述的方法:

for (n_topics <- Range(10,301,5) ){

  val s = n_topics.toString
  val lda = new LDA().setK(n_topics).setMaxIterations(20) 
  val ldaModel = lda.run(docsWithFeatures)
  val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]

  // write some model info to file
  val outfile = new File(s"model_summary_$s")
  @transient val bw = new BufferedWriter(new FileWriter(outfile))
  bw.write("topicConcentration:"+"\t"+distLDAModel.topicConcentration+"\n")
  bw.write("docConcentration:"+"\t"+distLDAModel.docConcentration(0)+"\n")
  bw.write("LL:"+"\t"+distLDAModel.logLikelihood+"\n")
  bw.close()

  // convert to Distributed model so we can get our topic data out 
  val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]

  // Save the document-topic matrix to file
  distLDAModel.topicDistributions.saveAsTextFile(s"doc_topic_$s")

  // this saves the word-topic matrix to file
  val topic_mat = distLDAModel.topicsMatrix
  val localMatrix: List[Array[Double]] = topic_mat.transpose.toArray.grouped(topic_mat.numCols).toList
  val lines: List[String] = localMatrix.map(line => line.mkString(" "))
  val outfile2 = new File(s"artist_topic_$s")
  @transient val bw2 = new BufferedWriter(new FileWriter(outfile2))
  for (line <- lines) bw2.write(line+"\n")
  bw2.close()
}

好的。所以这一切都很好,但正如我所说,我开始遇到任务失败,我越来越有可能增加主题的数量。而且我认为这些是由内存问题引起的,这让我想知道如何在 spark 中调整 LDA 的性能。

我在 Google Cloud Dataproc 上运行,所以我的资源很灵活,但我意识到我对 Spark 的 LDA 的内部原理了解得不够多,无法知道如何在这里最好地优化性能。

到目前为止,我的一次尝试是我在这一行中所做的:

val docsWithFeatures = parse(text).repartition(192).cache()

在这里,我将我的文档 RDD 重新分区为 192 个分区(对于这个示例,我在 48 个内核上运行 spark,因此使用 4*n_cores 的经验法则)并缓存它。如果我在 RDD 上做重复映射,这是合理的,但我不确定它是否/如何帮助这里的性能。我还能在这里做什么?

为了方便任何答案,以下是我的语料库中的一些摘要统计信息:

  • 文件:166,784
  • 词汇量(唯一术语的数量):112,312
  • 总代币:4,430,237,213

也许我的大量令牌是这里的主要问题,我只需要增加每个任务的内存,即增加可用内存并减少执行程序的数量。但这当然取决于 Spark LDA 在幕后工作的准确程度。例如,请参阅我之前的问题

4

0 回答 0