我通过 Logstash 到 Kafka 到 S3 管道收到以下消息。我正在使用 Hudi DeltaStreamer 使用 Avro 模式来使用来自 Kafka 的数据。
示例数据:(将 Parquet 输出为 JSON)
{
"_hoodie_commit_time": "20210802155413",
"_hoodie_commit_seqno": "20210802155413_0_1",
"_hoodie_record_key": "068794074199",
"_hoodie_partition_path": "068794074199",
"_hoodie_file_name": "896d-0_0-22-27_20210802155413.parquet",
"host": {
"hostname": "ip-188",
"os": {
"kernel": "3.13.0-125-generic",
"codename": "trusty",
"name": "Ubuntu",
"family": "debian",
"version": "14.04.6 LTS, Trusty Tahr",
"platform": "ubuntu"
},
"containerized": false,
"name": "ip-188",
"id": "4bbb8f",
"architecture": "x86_64"
},
"message": "{\"message_json\": \"H4sIAAAAAAAAAO1ce28gKBQHv1cA\\nAA==\\n\", \"curr_dt\": \"2021-08-01 14:25:18.389413\", \"current_devic\": \"3559240cb4b7fa03\", \"batch_datetim\": \"2021-01-25 11:11:50.602\", \"sync_final_background\": 1}",
"tags": {
"array": [
"beats_input_codec_plain_applied"
]
}
}
虽然我可以反序列化大多数字段(使用基于文件的 Avro 模式),但我不能这样做"message"
- 我目前正在使用“字符串”数据类型来提取值。它"message_json"
是 base64 编码的,我也想解码这些数据。我的记录键应该是curr_dt
+的组合current_devic
。
Hudi DeltaStreamer 有一个--transformer-class
选项允许使用自定义子类,org.apache.hudi.utilities.transform.Transformer
该子类适用于 Spark“数据集”数据结构,但如何将"message"
字符串转换为 JSON 和解码"message_json"
?
火花作业:
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field host.id \
--target-base-path /user/hive/warehouse/test_cow \
--target-table test_cow \
--props /var/demo/config/kafka-source.properties \
--hoodie-conf hoodie.datasource.write.recordkey.field=host.id \
--hoodie-conf hoodie.datasource.write.partitionpath.field=host.id \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
Avro 架构:
{
"type": "record",
"name": "test",
"fields": [
{
"name": "host",
"type": {
"name": "hostrecord",
"type": "record",
"fields": [
{
"name": "hostname",
"type": "string"
},
{
"name": "os",
"type": {
"name": "osrecord",
"type": "record",
"fields": [
{
"name": "kernel",
"type": "string"
},
{
"name": "codename",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "family",
"type": "string"
},
{
"name": "version",
"type": "string"
},
{
"name": "platform",
"type": "string"
}
]
}
},
{
"name": "containerized",
"type": "boolean"
},
{
"name": "ip",
"type": { "type": "array", "name": "iparray", "items": "string" }
},
{
"name": "name",
"type": "string"
},
{
"name": "id",
"type": "string"
},
{
"name": "architecture",
"type": "string"
}
]
}
},
{
"name": "message",
"type": "string"
},
{
"name": "tags",
"type": { "type": "array", "name": "tagsarray", "items": "string" }
}
]
}