0

我正在尝试运行 FPGrowth,但实际上我遇到了输入类型的问题。给定代码:

%scala
// association rule learning for OFFLINE with FPGrowth from MLLib
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.mllib.fpm.PrefixSpan
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.FlatMapFunction
import org.apache.spark.mllib.linalg.Vectors

val dfoffline = spark.table("offlinetrx")
val products = dfoffline
.groupBy("Beleg")
.agg(
collect_set("Produkt") as "items")

// debugging
val columnProducts = products.select("items")
columnProducts.printSchema()
columnProducts.show()

这会产生以下输出:

root
|-- items: array (nullable = true)
|    |-- element: string (containsNull = true)

+--------------------+
|               items|
+--------------------+
|[19420.01, 46872.01]|
|[AEC003.01, AEC00...|
|  [BT102.01, BET103]|

代码继续转换为 RDD 并执行 FPGrowth

val rdd = columnProducts.rdd
val fpg = new FPGrowth().setMinSupport(0.2).setNumPartitions(6)
val model = fpg.run(rdd)

然后 Spark 告诉我:

error: inferred type arguments [Nothing,org.apache.spark.sql.Row] do
not conform to method run's type parameter bounds [Item,Basket <:
Iterable[Item]]

val model = fpg.run(rdd)
notebook:74: error: type mismatch;:
found   : org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row]
required: org.apache.spark.api.java.JavaRDD[Basket]

然后我试图映射数据框

val rdd = columnProducts.map{x:Row => x.getAs[List](0)}

但这会导致另一个问题:

error: kinds of the type arguments (List) do not conform to the
expected kinds of the type parameters (type T).
List's type parameters do not match type T's expected parameters:
type List has one type parameter, but type T has none
val rdd = columnProducts.map{x:Row => x.getAs[List](0)}

如何将类型参数 (T) 推断为 getAs List 命令?

或者有没有人有另一个好主意如何实际解决需要一个篮子的 RDD 但有一个 RDD 的 Rows 的问题?

谢谢你们

4

0 回答 0