2

我正在使用 Spark 2.1 的结构化流从内容是二进制 avro 编码的 Kafka 主题中读取。

因此,设置后DataFrame

val messages = spark
  .readStream
  .format("kafka")
  .options(kafkaConf)
  .option("subscribe", config.getString("kafka.topic"))
  .load()

如果我打印这个DataFrame( messages.printSchema()) 的模式,我会得到以下信息:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- timestampType: integer (nullable = true)

这个问题应该与 avro-decoding 的问题是正交的,但是让我们假设我想以某种方式将value消息中的内容转换DataFrameDataset[BusinessObject], 通过 function Array[Byte] => BusinessObject。例如完整性,函数可能只是(使用avro4s):

case class BusinessObject(userId: String, eventId: String)

def fromAvro(bytes: Array[Byte]): BusinessObject =
    AvroInputStream.binary[BusinessObject](
        new ByteArrayInputStream(bytes)
    ).iterator.next

当然,正如 miguno 在这个相关问题中所说,我不能只用 a 应用转换DataFrame.map(),因为我需要为这样的 a 提供一个隐式编码器BusinessObject

这可以定义为:

implicit val myEncoder : Encoder[BusinessObject] = org.apache.spark.sql.Encoders.kryo[BusinessObject]

现在,执行地图:

val transformedMessages : Dataset[BusinessObjecŧ] = messages.map(row => fromAvro(row.getAs[Array[Byte]]("value")))

但是,如果我查询新架构,我会得到以下信息:

root
 |-- value: binary (nullable = true)

而且我认为这没有任何意义,因为数据集应该使用BusinessObject案例类的 Product 属性并获得正确的值。

.schema(StructType)在阅读器中看到了一些关于 Spark SQL 使用的示例,但我不能这样做,不仅仅是因为我使用的是readStream,而是因为我实际上必须先转换列才能在这些字段中操作。

我希望告诉 Spark SQL 引擎,transformedMessagesDataset 模式是StructField带有案例类字段的。

4

1 回答 1

1

我会说你得到你所要求的。正如我今天已经解释 Encoders.kryo的那样,生成一个blob带有序列化的对象。它的内部结构对于 SQL 引擎来说是不透明的,如果不反序列化对象就无法访问。因此,您的代码所做的就是采用一种序列化格式并将其替换为另一种。

您遇到的另一个问题是您尝试将动态类型的DataFrame( Dataset[Row]) 与静态类型的对象混合使用。排除 UDT API Spark SQL 不能这样工作。您可以静态使用,DatasetDataFrame可以使用层次结构编码的对象结构struct

好消息是简单的产品类型BusinessObject应该可以正常工作而无需任何笨拙Encoders.kryo。只需跳过 Kryo 编码器定义并确保导入隐式编码器:

import spark.implicits._
于 2017-01-05T17:41:20.130 回答