我从 Kafka 获取数据,然后反序列化Array[Byte]
使用默认解码器,之后我的 RDD 元素看起来像(null,[B@406fa9b2)
,(null,[B@21a9fe0)
但我想要具有模式的原始数据,那么我该如何实现呢?
我以 Avro 格式序列化消息。
我从 Kafka 获取数据,然后反序列化Array[Byte]
使用默认解码器,之后我的 RDD 元素看起来像(null,[B@406fa9b2)
,(null,[B@21a9fe0)
但我想要具有模式的原始数据,那么我该如何实现呢?
我以 Avro 格式序列化消息。
您必须使用适当的反序列化器解码字节,例如字符串或您的自定义对象。
如果你不进行解码,你得到[B@406fa9b2
的只是 Java 中字节数组的文本表示。
Kafka 对消息的内容一无所知,因此它将字节数组从生产者传递给消费者。
在 Spark Streaming 中,您必须对键和值使用序列化程序(引用KafkaWordCount 示例):
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
使用上述序列化程序DStream[String]
,您可以使用RDD[String]
.
但是,如果您想直接将字节数组反序列化为自定义类,则必须编写自定义Serializer(这是 Kafka 特定的,与 Spark 无关)。
我建议使用带有固定架构或 Avro 的 JSON(使用Kafka、Spark 和 Avro - 第 3 部分,生产和使用 Avro 消息中描述的解决方案)。
然而,在结构化流中,管道可能如下所示:
val fromKafka = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
select('value cast "string") // <-- conversion here