我有一个 pyspark 应用程序正在使用来自 Kafka 主题的消息,这些消息由org.apache.kafka.connect.json.JsonConverter
. 我正在使用融合的 Kafka JDBC 连接器来执行此操作
问题是,当我使用消息时,ID 列出现在某种编码文本中,例如“ARM=”,而它应该是数字类型。
这是我现在拥有的代码
spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)
kafka_params = {
"bootstrap.servers": "kafkahost:9092",
"group.id": "Deserialize"
}
kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))
ssc.start()
ssc.awaitTermination()
我知道 createDirectStream 有一个我可以设置的 valueDecoder 参数,问题是我不知道如何使用它进行解码。我也事先知道架构,因此如果需要,我将能够创建一个。
作为参考,这是我打印出 rdd.foreach 时得到的 JSON
{
"schema": {
"type": "struct",
"fields": [
{
"type": "bytes",
"optional": False,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "0"
},
"field": "ID"
},
{
"type": "string",
"optional": True,
"field": "COLUMN1"
}
],
"optional": False
},
"payload": {
"ID": "AOo=",
"COLUMN1": "some string"
}
}