0

我正在尝试使用 Spark 2.1.1 中的结构化流来读取 Kafka 并解码 Avro 编码的消息。我有根据这个问题定义的 UDF 。

val sr = new CachedSchemaRegistryClient(conf.kafkaSchemaRegistryUrl, 100)
val deser = new KafkaAvroDeserializer(sr)

val decodeMessage = udf { bytes:Array[Byte] => deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead] }

val topic = conf.inputTopic
val df = session
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", conf.kafkaServers)
    .option("subscribe", topic)
    .load()

df.printSchema()

val result = df.selectExpr("CAST(key AS STRING)", """decodeMessage($"value") as "value_des"""")

val query = result.writeStream
    .format("console")
    .outputMode(OutputMode.Append())
    .start()

但是我得到以下失败。

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type DeviceRelayStateEnum is not supported

它在这条线上失败

val decodeMessage = udf { bytes:Array[Byte] => deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead] }

另一种方法是为我拥有的自定义类定义编码器

implicit val enumEncoder = Encoders.javaSerialization[DeviceRelayStateEnum]
implicit val messageEncoder = Encoders.product[DeviceRead]

但是在注册 messageEncoder 时失败并出现以下错误。

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for DeviceRelayStateEnum
- option value class: "DeviceRelayStateEnum"
- field (class: "scala.Option", name: "deviceRelayState")
- root class: "DeviceRead"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:476)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)

当我尝试使用 a mapafter执行此操作时load(),出现以下编译错误。

val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])

Error:(76, 26) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[DeviceRead])org.apache.spark.sql.Dataset[DeviceRead].
Unspecified value parameter evidence$6.
      val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])
Error:(76, 26) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
      val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])

这是否意味着我不能对 Java 枚举使用结构化流?它只能与原语或案例类一起使用吗?

我阅读了一些相关的问题1 , 2 , 3,似乎有可能为类指定自定义编码器,即 UDT 在 2.1 中被删除,并且没有添加新功能。

任何帮助将不胜感激。

4

1 回答 1

1

认为您可能在当前版本的结构化流(和 Spark SQL)中要求太多。

我还不能完全理解如何以所谓的更专业的方式处理丢失编码器的问题,但是当您尝试创建Dataset枚举时会遇到同样的问题。这可能还没有得到简单的支持。

Structured Streaming 只是 Spark SQL 之上的一个流式库,并将其用于序列化-反序列化 (SerDe)。

为了使故事简短并让您继续前进(直到您找到更好的方法),我建议避免在您用来表示数据集模式的业务对象中使用枚举。

所以,我建议按照以下方式做一些事情:

val decodeMessage = udf { bytes:Array[Byte] =>
  val dr = deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead]

  // do additional transformation here so you use a custom streaming-specific class
  // Here I'm using a simple tuple to hold what might be relevant
  // You could create a case class instead to have proper names
  (dr.id, dr.value)
}
于 2017-06-23T14:01:45.107 回答