在 Apache Spark (Scala shell) 中,我正在尝试:
val model = ALS.trainImplicit(training, rank, numIter)
其中训练是一个百万行的文件,分为 100 个分区,rank=20,numIter=20。
我收到一串形式的消息:
WARN scheduler.TaskSetManager: Stage 2175 contains a task of very large size (101 KB). The maximum recommended task size is 100 KB.
我该如何调试呢?我听说广播变量在减少任务大小方面很有用,但在这种情况下,除了 RDD 本身之外没有其他大变量,并且它已经被划分为许多块。
整个代码如下(从 Spark shell 运行):
代码
import java.io.File
import scala.io.Source
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
// Input file
// This file has about a million rows in the format:
// user::item::rating
val trainfile = "training_rows.txt"
// Function defs
def hash32(x: String) : Int = {
return x.hashCode() & 0x7fffffff
}
def readfile(datafile: String) : RDD[Rating] = {
val numPartitions = 100
sc.textFile(datafile).map { line =>
val fields = line.split("::")
Rating(hash32(fields(0)), hash32(fields(1)), fields(2).toDouble)
}
.repartition(numPartitions).cache()
}
// Main code
val training = readfile(trainfile)
val numTraining = training.count()
println("Training rows: " + numTraining)
val rank = 20
val numIter = 20
val model = ALS.trainImplicit(training, rank, numIter)