2

我只是在 Flink中尝试使用 Scala类型类。我定义了以下类型类接口:

trait LikeEvent[T] {
    def timestamp(payload: T): Int
}

现在,我想考虑一个这样DataSet的:LikeEvent[_]

// existing classes that need to be adapted/normalized (without touching them)
case class Log(ts: Int, severity: Int, message: String)
case class Metric(ts: Int, name: String, value: Double)

// create instances for the raw events
object EventInstance {

    implicit val logEvent = new LikeEvent[Log] {
        def timestamp(log: Log): Int = log.ts
    }

    implicit val metricEvent = new LikeEvent[Metric] {
        def timestamp(metric: Metric): Int = metric.ts
    }
}

// add ops to the raw event classes (regular class)
object EventSyntax {

    implicit class Event[T: LikeEvent](val payload: T) {
        val le = implicitly[LikeEvent[T]]
        def timestamp: Int = le.timestamp(payload)
    }
}

以下应用程序运行良好:

// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// underlying (raw) events
val events: DataSet[Event[_]] = env.fromElements(
  Metric(1586736000, "cpu_usage", 0.2),
  Log(1586736005, 1, "invalid login"),
  Log(1586736010, 1, "invalid login"),
  Log(1586736015, 1, "invalid login"),
  Log(1586736030, 2, "valid login"),
  Metric(1586736060, "cpu_usage", 0.8),
  Log(1586736120, 0, "end of world"),
)

// count events per hour
val eventsPerHour = events
  .map(new GetMinuteEventTuple())
  .groupBy(0).reduceGroup { g =>
    val gl = g.toList
    val (hour, count) = (gl.head._1, gl.size)
    (hour, count)
  }

eventsPerHour.print()

打印预期输出

(0,5)
(1,1)
(2,1)

但是,如果我像这样修改语法对象:

// couldn't make it work with Flink!
// add ops to the raw event classes (case class)
object EventSyntax2 {

  case class Event[T: LikeEvent](payload: T) {
    val le = implicitly[LikeEvent[T]]
    def timestamp: Int = le.timestamp(payload)
  }

  implicit def fromPayload[T: LikeEvent](payload: T): Event[T] = Event(payload)  
}

我收到以下错误:

type mismatch;
found   : org.apache.flink.api.scala.DataSet[Product with Serializable]
required: org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]]

因此,在消息的指导下,我进行了以下更改:

val events: DataSet[Event[_]] = env.fromElements[Event[_]](...)

之后,错误变为:

could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]]

我不明白为什么EventSyntax2会导致这些错误,而EventSyntax编译和运行良好。为什么使用案例类包装器EventSyntax2比使用常规类更成问题EventSyntax

无论如何,我的问题是双重的:

  • 我该如何解决我的问题EventSyntax2
  • 实现我的目标最简单的方法是什么?在这里,我只是为了学习而尝试使用类型类模式,但绝对是一种更面向对象的方法(基于子类型)对我来说看起来更简单。像这样的东西:
// Define trait
trait Event {
    def timestamp: Int
    def payload: Product with Serializable // Any case class
}

// Metric adapter (similar for Log)
object MetricAdapter {

    implicit class MetricEvent(val payload: Metric) extends Event {
        def timestamp: Int = payload.ts
    }
}

然后简单地val events: DataSet[Event] = env.fromElements(...)在 main 中使用。

请注意,实现某个类型类的类列表提出了类似的问题,但它考虑了一个简单的 ScalaList而不是 Flink DataSet(或DataStream)。我的问题的重点是在 Flink 中使用类型类模式以某种方式考虑异构流/数据集,以及它是否真的有意义,或者在这种情况下是否应该明确支持常规特征并如上所述继承它。

顺便说一句,你可以在这里找到代码:https ://github.com/salvalcantara/flink-events-and-polymorphism 。

4

1 回答 1

3

简短回答:Flink 不能TypeInformation在 scala 中为通配符类型派生

长答案:您的两个问题都在问,什么是TypeInformation,它是如何使用的,以及它是如何派生的。

TypeInformation是 Flink 的内部类型系统,当数据通过网络洗牌并存储在 statebackend 中时(使用 DataStream api 时),它用于序列化数据。

序列化是数据处理中的主要性能问题,因此 Flink 包含用于常见数据类型和模式的专用序列化器。开箱即用,在其 Java 堆栈中,它支持所有 JVM 原语、Pojo、Flink 元组、一些常见的集合类型和 avro。您的类的类型是使用反射确定的,如果它与已知类型不匹配,它将回退到 Kryo。

在 scala api 中,类型信息是使用隐式派生的。scala DataSet 和 DataStream api 上的所有方法都将其泛型参数注释为隐式类型类。

def map[T: TypeInformation] 

TypeInformation可以像任何类型类一样手动提供,也可以使用从 flink 导入的宏派生。

import org.apache.flink.api.scala._

这个宏通过支持 scala 元组、scala case 类和一些常见的 scala std 库类型来装饰 java 类型堆栈。我说装饰器是因为如果您的类不是这些类型之一,它可以并且会回退到 Java 堆栈。

那么为什么版本 1 有效?

因为它是一个类型堆栈无法匹配的普通类,所以它将它解析为泛型类型并返回一个基于 kryo 的序列化器。您可以从控制台测试它并查看它返回一个泛型类型。

> scala> implicitly[TypeInformation[EventSyntax.Event[_]]]
res2: org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax.Event[_]] = GenericType<com.salvalcantara.fp.EventSyntax.Event>

版本 2 不起作用,因为它将类型识别为案例类,然后递归地TypeInformation为其每个成员派生实例。这对于通配符类型是不可能的,因为通配符类型不同Any,因此派生失败。

通常,您不应将 Flink 与异构类型一起使用,因为它无法为您的工作负载派生有效的序列化程序。

于 2020-04-18T15:15:43.617 回答