3

这可能是一个愚蠢的问题,但我已经挣扎了很长一段时间。它确实类似于这个问题,但我无法在我的代码中应用它(模式或作为一个函数)。

我想将 flatMap(或 map)转换函数传递给函数参数,然后将其代理到实际调用 df.rdd.flatMap 方法的策略函数。我会尽力解释!

case class Order(id: String, totalValue: Double, freight: Double) 
case class Product(id: String, price: Double) 

... or any other case class, whatever one needs to transform a row into ...

实体类:

class Entity(path: String) = {
  ...
  def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Entity = {
      this.getStrategy.flatMap[T](mapFunction)
      return this
  }
  def save(path: String): Unit = {
      ... write logic ...
  } 
}

实体可能对其方法有不同的策略。实体策略如下:

abstract class EntityStrategy(private val entity: Entity,
                              private val spark: SparkSession) {
  ...
  def flatMap[T](mapFunction: (Row) => ArrayBuffer[T])
  def map[T](mapFunction: (Row) => T)
}

以及一个示例 EntityStrategy 实现:

class SparkEntityStrategy(private val entity: Entity, private val spark: SparkSession)
  extends EntityStrategy(entity, spark) {
  ...
  override def map[T](mapFunction: Row => T): Unit = {
    val rdd = this.getData.rdd.map(f = mapFunction)
    this.dataFrame = this.spark.createDataFrame(rdd)
  }

  override def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Unit = {
    var rdd = this.getData.rdd.flatMap(f = mapFunction)
    this.dataFrame = this.spark.createDataFrame(rdd)
  }
}

最后,我想创建一个 flatMap/map 函数并像这样调用它:

def transformFlatMap(row: Row): ArrayBuffer[Order] = {
    var orders = new ArrayBuffer[Order]
    var _deliveries = row.getAs[Seq[Row]]("deliveries")
    _deliveries.foreach(_delivery => {
       var order = Order(
           id = row.getAs[String]("id"),
           totalValue = _delivery.getAs("totalAmount").asInstanceOf[Double])
      orders += order
    })
   return orders
}

val entity = new Entity("path")
entity.flatMap[Order](transformFlatMap).save("path")

当然,这是行不通的。我收到关于 SparkEntityStrategy 的错误:

错误:(95, 35) T val rdd = this.getData.rdd.map(f = mapFunction) 没有可用的 ClassTag

我曾尝试(implicit encoder: Encoder: T)在实体和策略方法中添加一个,但这是不行的。可能做错了什么,因为我是 Scala 的新手。

如果我删除“T”并传递一个实际的案例类,一切正常。

4

1 回答 1

3

结果是为了满足编译器和 Spark 的方法,我需要添加以下类型标签:

[ T <: scala.Product : ClassTag : TypeTag]

所以这两种方法都变成了:

def map[T <: Product : ClassTag : TypeTag](mapFunction: (Row) => T): Entity
def flatMap[T <: scala.Product : ClassTag : TypeTag](mapFunction: (Row) => TraversableOnce[T]): Entity

关于scala.Product :

所有产品的基本特征,在标准库中至少包括 scala.Product1 到 scala.Product22 以及它们的子类 scala.Tuple1 到 scala.Tuple22。此外,所有案例类都使用综合生成的方法实现 Product。

由于我使用案例类对象作为函数的返回类型,因此我需要scala.Product以便 Spark 的createDataFrame可以匹配正确的重载。

为什么同时使用ClassTagTypeTag

通过删除TypeTag,编译器会抛出以下错误:

错误:(96, 48) T this.dataFrame = this.spark.createDataFrame(rdd) 没有可用的 TypeTag

并删除ClassTag

错误:(95, 35) T val rdd = this.getData.rdd.map(f = mapFunction) 没有可用的 ClassTag

添加它们使两种方法都满意,并且一切都按预期工作。

找到了一篇很好的文章,解释了 Scala 中的类型擦除。

于 2019-02-23T14:44:17.853 回答