1

根据这个问题,我在 CountVectorizer 之后应用 udf 过滤空向量。

val tokenizer = new RegexTokenizer().setPattern("\\|").setInputCol("dataString").setOutputCol("dataStringWords")
val vectorizer = new CountVectorizer().setInputCol("dataStringWords").setOutputCol("features")

val pipelineTV = new Pipeline().setStages(Array(tokenizer, vectorizer))
val modelTV = pipelineTV.fit(dataset1)

val isNoneZeroVector = udf({v: Vector => v.numNonzeros > 0}, DataTypes.BooleanType)

val dataset1_TV = modelTV.transform(dataset1).filter(isNoneZeroVector(col("features")))
val dataset2_TV = modelTV.transform(dataset2).filter(isNoneZeroVector(col("features")))

val lsh = new MinHashLSH().setNumHashTables(20).setInputCol("features").setOutputCol("hashValues")
val pipelineLSH = new Pipeline().setStages(Array(lsh))
val modelLSH = pipelineLSH.fit(dataset1_TV) // Fails at this line

这段代码通过 spark-shell 完美运行。但是,当我通过spark-submit运行相同的代码彻底驱动程序应用程序时,它会给出以下错误:

java.lang.ClassCastException:无法将 scala.collection.immutable.List$SerializationProxy 的实例分配给 scala.collection.Seq 类型的字段 org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_在 org.apache.spark.rdd.MapPartitionsRDD 的实例中

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[scala-library-2.11.8.jar:?]
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) ~[scala-library-2.11.8.jar:?]
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at scala.Option.foreach(Option.scala:257) ~[scala-library-2.11.8.jar:?]
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2153) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2160) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.ml.feature.LSH.fit(LSH.scala:322) ~[spark-mllib_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.ml.feature.LSH.fit(LSH.scala:298) ~[spark-mllib_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:153) ~[spark-mllib_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149) ~[spark-mllib_2.11-2.2.0.jar:2.2.0]
        at scala.collection.Iterator$class.foreach(Iterator.scala:893) ~[scala-library-2.11.8.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) ~[scala-library-2.11.8.jar:?]
        at scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44) ~[scala-library-2.11.8.jar:?]
        at scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37) ~[scala-library-2.11.8.jar:?]
        at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:149) ~[spark-mllib_2.11-2.2.0.jar:2.2.0]
        at com.sheel.driver.job.XJob.ymethod(XJob.scala:170) ~[MyApplication-full.jar:1.0.0]
        at com.sheel.driver.job.XJob.zmethod(XJob.scala:135) [MyApplication-full.jar:1.0.0]
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) ~[?:1.8.0_161]
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) ~[?:1.8.0_161]
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.Task.run(Task.scala:108) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) ~[spark-core_2.11-2.2.0.jar:2.2.0]

如果我isNoneZeroVector从进程中删除 UDF,那么它的作用就像魅力一样。但根据MinHash 标准,输入向量必须至少有 1 个非零条目。

那么是否有任何不同的方式为 Spark ML 编写 UDF。我之前为 Spark SQL 编写了 UDF,并且按照上述语法可以正常工作。

4

1 回答 1

0

isNoneZeroVector我在问题中遇到了与 udf类似的错误。

org.apache.spark.sql.AnalysisException: You're using untyped Scala UDF, which does not have the input type information. 

如果您重新定义,那么它将起作用(也可能有一种方法可以在一行中做到这一点)

def isNotEmptyVector(vect: Vector): Boolean = {
   val ifNotEmpty = vect.numNonzeros > 0
   return ifNotEmpty
}

val isNoneZeroVector = udf[Boolean, Vector](isNotEmptyVector)

df.withColumn("notEmpty", isNoneZeroVector($"features"))
于 2021-11-12T07:06:35.963 回答