我正在 Spark 中实现一个 LDA 模型(通过 Scala API),并使用不同数量的主题测试模型。一般来说,它似乎工作正常,但遇到间歇性任务失败,我很确定这与内存问题有关。我当前代码的相关部分如下。
请注意,我正在从 RDD 的文本转储中加载我的数据,其中每个文档都是一个稀疏的 mllib 向量。因此,我的文件中的示例行LDA_vectors
如下所示:
(7066346,(112312,[1,3,5,7,...],[2.0,57.0,10.0,2.0,...]))
这是标准的 mllib 稀疏格式,可以读作
(document_id,(vocabulary_size,[term_id_1, term_id_2...], [term_1_freq,term_2, freq,...]))
因此该parse
函数处理将其读入 RDD:
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel, LocalLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import java.io._
def parse(rdd: RDD[String]): RDD[(Long, Vector)] = {
val pattern: scala.util.matching.Regex = "\\(([0-9]+),(.*)\\)".r
rdd .map{
case pattern(k, v) => (k.toLong, Vectors.parse(v))
}
}
val text = sc.textFile("/path/to/LDA_vectors")
val docsWithFeatures = parse(text).repartition(192).cache()
然后我在不同数量的主题上运行一个循环。请注意,用于将 word-document 矩阵保存到文件的代码块遵循此处描述的方法:
for (n_topics <- Range(10,301,5) ){
val s = n_topics.toString
val lda = new LDA().setK(n_topics).setMaxIterations(20)
val ldaModel = lda.run(docsWithFeatures)
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
// write some model info to file
val outfile = new File(s"model_summary_$s")
@transient val bw = new BufferedWriter(new FileWriter(outfile))
bw.write("topicConcentration:"+"\t"+distLDAModel.topicConcentration+"\n")
bw.write("docConcentration:"+"\t"+distLDAModel.docConcentration(0)+"\n")
bw.write("LL:"+"\t"+distLDAModel.logLikelihood+"\n")
bw.close()
// convert to Distributed model so we can get our topic data out
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
// Save the document-topic matrix to file
distLDAModel.topicDistributions.saveAsTextFile(s"doc_topic_$s")
// this saves the word-topic matrix to file
val topic_mat = distLDAModel.topicsMatrix
val localMatrix: List[Array[Double]] = topic_mat.transpose.toArray.grouped(topic_mat.numCols).toList
val lines: List[String] = localMatrix.map(line => line.mkString(" "))
val outfile2 = new File(s"artist_topic_$s")
@transient val bw2 = new BufferedWriter(new FileWriter(outfile2))
for (line <- lines) bw2.write(line+"\n")
bw2.close()
}
好的。所以这一切都很好,但正如我所说,我开始遇到任务失败,我越来越有可能增加主题的数量。而且我认为这些是由内存问题引起的,这让我想知道如何在 spark 中调整 LDA 的性能。
我在 Google Cloud Dataproc 上运行,所以我的资源很灵活,但我意识到我对 Spark 的 LDA 的内部原理了解得不够多,无法知道如何在这里最好地优化性能。
到目前为止,我的一次尝试是我在这一行中所做的:
val docsWithFeatures = parse(text).repartition(192).cache()
在这里,我将我的文档 RDD 重新分区为 192 个分区(对于这个示例,我在 48 个内核上运行 spark,因此使用 4*n_cores 的经验法则)并缓存它。如果我在 RDD 上做重复映射,这是合理的,但我不确定它是否/如何帮助这里的性能。我还能在这里做什么?
为了方便任何答案,以下是我的语料库中的一些摘要统计信息:
- 文件:166,784
- 词汇量(唯一术语的数量):112,312
- 总代币:4,430,237,213
也许我的大量令牌是这里的主要问题,我只需要增加每个任务的内存,即增加可用内存并减少执行程序的数量。但这当然取决于 Spark LDA 在幕后工作的准确程度。例如,请参阅我之前的问题。