这可能是一个愚蠢的问题,但我已经挣扎了很长一段时间。它确实类似于这个问题,但我无法在我的代码中应用它(模式或作为一个函数)。
我想将 flatMap(或 map)转换函数传递给函数参数,然后将其代理到实际调用 df.rdd.flatMap 方法的策略函数。我会尽力解释!
case class Order(id: String, totalValue: Double, freight: Double)
case class Product(id: String, price: Double)
... or any other case class, whatever one needs to transform a row into ...
实体类:
class Entity(path: String) = {
...
def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Entity = {
this.getStrategy.flatMap[T](mapFunction)
return this
}
def save(path: String): Unit = {
... write logic ...
}
}
实体可能对其方法有不同的策略。实体策略如下:
abstract class EntityStrategy(private val entity: Entity,
private val spark: SparkSession) {
...
def flatMap[T](mapFunction: (Row) => ArrayBuffer[T])
def map[T](mapFunction: (Row) => T)
}
以及一个示例 EntityStrategy 实现:
class SparkEntityStrategy(private val entity: Entity, private val spark: SparkSession)
extends EntityStrategy(entity, spark) {
...
override def map[T](mapFunction: Row => T): Unit = {
val rdd = this.getData.rdd.map(f = mapFunction)
this.dataFrame = this.spark.createDataFrame(rdd)
}
override def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Unit = {
var rdd = this.getData.rdd.flatMap(f = mapFunction)
this.dataFrame = this.spark.createDataFrame(rdd)
}
}
最后,我想创建一个 flatMap/map 函数并像这样调用它:
def transformFlatMap(row: Row): ArrayBuffer[Order] = {
var orders = new ArrayBuffer[Order]
var _deliveries = row.getAs[Seq[Row]]("deliveries")
_deliveries.foreach(_delivery => {
var order = Order(
id = row.getAs[String]("id"),
totalValue = _delivery.getAs("totalAmount").asInstanceOf[Double])
orders += order
})
return orders
}
val entity = new Entity("path")
entity.flatMap[Order](transformFlatMap).save("path")
当然,这是行不通的。我收到关于 SparkEntityStrategy 的错误:
错误:(95, 35) T val rdd = this.getData.rdd.map(f = mapFunction) 没有可用的 ClassTag
我曾尝试(implicit encoder: Encoder: T)
在实体和策略方法中添加一个,但这是不行的。可能做错了什么,因为我是 Scala 的新手。
如果我删除“T”并传递一个实际的案例类,一切正常。