我有一个这样的数据框:
val temp = sc.parallelize(Seq(Array(43,53,266),Array(69,160,166),Array(266)))
.toDF("value")
我想选择与以下数组相交的行:
val goodValue = Array(231, 266)
val broadcastGood = sc.broadcast(goodValue)
val containGood = udf((array:scala.collection.mutable.WrappedArray[Int]) =>
broadcastGood.value.intersect(array).size>0)
当我尝试这个 udf 时,
display(temp.filter(containGood(col("value"))))
我收到了臭名昭著的错误:Task not Serializable
奇怪的是,这曾经对我很有效。我不知道发生了什么变化..非常感谢一些帮助。
编辑:实际上上面的代码通常应该可以正常工作,而且这里不需要广播变量。你们中的一些人提到“其中一个值在不可序列化的 Scala 类中”,我同意这应该是问题,但是,我不知道如何解决它..
这是背景信息:我正在使用潜在狄利克雷分配(LDA)对语料库进行主题分析:
val ldaModel = lda.fit(dfVectorizer)
dfVectorizer 是我原始数据集的矢量化版本。使用这个 lda 模型,我生成以下数据集:
val topic = ldaModel.describeTopics(50) //with three columns[topic:int, termIndices: array<Int>, termWeights: array<Double>]
val interestTerms = Seq(1,2,3,4,5,6,7)
val interestUDF = udf((terms:Seq[Int]) =>terms.filter(r=>interestTerms.contains(r)))
val topicTmp = topic.withColumn("InterestTerm",interestUDF(col("termIndices")))
val sumVec = udf((terms: Seq[Int]) => terms.sum)
val topicDF = topicTmp.select('topic,sumVec('InterestTerm).as('order)).sort('order.desc)
所以最终的数据框“topicDF”看起来像这样:
Topic | Order
111 | 7
69 | 7
248 | 5
......
但是,如果我尝试执行这样的简单过滤器:
display(topicDF.filter("order>3"))
它会给我"task not Serializable"
错误。在错误消息中,它非常清楚地指出这是“由”引起的
java.io.NotSerializableException:org.apache.spark.mllib.clustering.DistributedLDAModel。
错误消息如下所示:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2135)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:133)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2807)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132)
at org.apache.spark.sql.Dataset$$anonfun$60.apply(Dataset.scala:2791)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87)
at org.apache.spark.sql.execution.SQLExecution$.withFileAccessAudit(SQLExecution.scala:53)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2790)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2132)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2345)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:81)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:42)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1$$anonfun$apply$1.apply(ScalaDriverLocal.scala:263)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1$$anonfun$apply$1.apply(ScalaDriverLocal.scala:254)
at scala.Option.map(Option.scala:145)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1.apply(ScalaDriverLocal.scala:254)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1.apply(ScalaDriverLocal.scala:228)
at scala.Option.map(Option.scala:145)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.getResultBuffer(ScalaDriverLocal.scala:228)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:209)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:230)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:211)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:173)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:168)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:39)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:206)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:39)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:211)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)
at scala.util.Try$.apply(Try.scala:161)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:584)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:488)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.spark.mllib.clustering.DistributedLDAModel
Serialization stack:
- object not serializable (class:
org.apache.spark.mllib.clustering.DistributedLDAModel, value:
org.apache.spark.mllib.clustering.DistributedLDAModel@ea5b214)
- writeObject data (class: scala.collection.mutable.HashMap)
- object (class scala.collection.mutable.HashMap, Map(lda_1da3e45afeaa__subsamplingRate -> 0.05, lda_1da3e45afeaa__k -> 320, lda_1da3e45afeaa__keepLastCheckpoint -> true, lda_1da3e45afeaa__maxIter -> 100, lda_1da3e45afeaa__optimizer -> em, lda_1da3e45afeaa__optimizeDocConcentration -> true, lda_1da3e45afeaa__learningDecay -> 0.51, lda_1da3e45afeaa__topicConcentration -> 1.1, lda_1da3e45afeaa__learningOffset -> 1024.0, lda_1da3e45afeaa__checkpointInterval -> 10, lda_1da3e45afeaa__featuresCol -> features, lda_1da3e45afeaa__seed -> 12345, lda_1da3e45afeaa__docConcentration -> [D@31af2961, lda_1da3e45afeaa__topicDistributionCol -> topicDistribution))
- field (class: org.apache.spark.ml.param.ParamMap, name: org$apache$spark$ml$param$ParamMap$$map, type: interface scala.collection.mutable.Map)
太感谢了!