我正在尝试使用 Spark 2.1.1 中的结构化流来读取 Kafka 并解码 Avro 编码的消息。我有根据这个问题定义的 UDF 。
val sr = new CachedSchemaRegistryClient(conf.kafkaSchemaRegistryUrl, 100)
val deser = new KafkaAvroDeserializer(sr)
val decodeMessage = udf { bytes:Array[Byte] => deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead] }
val topic = conf.inputTopic
val df = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.kafkaServers)
.option("subscribe", topic)
.load()
df.printSchema()
val result = df.selectExpr("CAST(key AS STRING)", """decodeMessage($"value") as "value_des"""")
val query = result.writeStream
.format("console")
.outputMode(OutputMode.Append())
.start()
但是我得到以下失败。
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type DeviceRelayStateEnum is not supported
它在这条线上失败
val decodeMessage = udf { bytes:Array[Byte] => deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead] }
另一种方法是为我拥有的自定义类定义编码器
implicit val enumEncoder = Encoders.javaSerialization[DeviceRelayStateEnum]
implicit val messageEncoder = Encoders.product[DeviceRead]
但是在注册 messageEncoder 时失败并出现以下错误。
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for DeviceRelayStateEnum
- option value class: "DeviceRelayStateEnum"
- field (class: "scala.Option", name: "deviceRelayState")
- root class: "DeviceRead"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:476)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
当我尝试使用 a map
after执行此操作时load()
,出现以下编译错误。
val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])
Error:(76, 26) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[DeviceRead])org.apache.spark.sql.Dataset[DeviceRead].
Unspecified value parameter evidence$6.
val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])
Error:(76, 26) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])
这是否意味着我不能对 Java 枚举使用结构化流?它只能与原语或案例类一起使用吗?
我阅读了一些相关的问题1 , 2 , 3,似乎有可能为类指定自定义编码器,即 UDT 在 2.1 中被删除,并且没有添加新功能。
任何帮助将不胜感激。