0

我通过 Logstash 到 Kafka 到 S3 管道收到以下消息。我正在使用 Hudi DeltaStreamer 使用 Avro 模式来使用来自 Kafka 的数据。

示例数据:(将 Parquet 输出为 JSON)

{
  "_hoodie_commit_time": "20210802155413",
  "_hoodie_commit_seqno": "20210802155413_0_1",
  "_hoodie_record_key": "068794074199",
  "_hoodie_partition_path": "068794074199",
  "_hoodie_file_name": "896d-0_0-22-27_20210802155413.parquet",
  "host": {
    "hostname": "ip-188",
    "os": {
      "kernel": "3.13.0-125-generic",
      "codename": "trusty",
      "name": "Ubuntu",
      "family": "debian",
      "version": "14.04.6 LTS, Trusty Tahr",
      "platform": "ubuntu"
    },
    "containerized": false,
    "name": "ip-188",
    "id": "4bbb8f",
    "architecture": "x86_64"
  },
  "message": "{\"message_json\": \"H4sIAAAAAAAAAO1ce28gKBQHv1cA\\nAA==\\n\", \"curr_dt\": \"2021-08-01 14:25:18.389413\", \"current_devic\": \"3559240cb4b7fa03\", \"batch_datetim\": \"2021-01-25 11:11:50.602\", \"sync_final_background\": 1}",
  "tags": {
    "array": [
      "beats_input_codec_plain_applied"
    ]
  }
}

虽然我可以反序列化大多数字段(使用基于文件的 Avro 模式),但我不能这样做"message"- 我目前正在使用“字符串”数据类型来提取值。它"message_json"是 base64 编码的,我也想解码这些数据。我的记录键应该是curr_dt+的组合current_devic

Hudi DeltaStreamer 有一个--transformer-class选项允许使用自定义子类,org.apache.hudi.utilities.transform.Transformer该子类适用于 Spark“数据集”数据结构,但如何将"message"字符串转换为 JSON 和解码"message_json"

火花作业:

spark-submit  \
    --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
    --table-type COPY_ON_WRITE \
    --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
    --source-ordering-field host.id \
    --target-base-path /user/hive/warehouse/test_cow \
    --target-table test_cow \
    --props /var/demo/config/kafka-source.properties \
    --hoodie-conf hoodie.datasource.write.recordkey.field=host.id \
    --hoodie-conf hoodie.datasource.write.partitionpath.field=host.id \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

Avro 架构:

{
  "type": "record",
  "name": "test",
  "fields": [
    {
      "name": "host",
      "type": {
        "name": "hostrecord",
        "type": "record",
        "fields": [
          {
            "name": "hostname",
            "type": "string"
          },
          {
            "name": "os",
            "type": {
              "name": "osrecord",
              "type": "record",
              "fields": [
                {
                  "name": "kernel",
                  "type": "string"
                },
                {
                  "name": "codename",
                  "type": "string"
                },
                {
                  "name": "name",
                  "type": "string"
                },
                {
                  "name": "family",
                  "type": "string"
                },
                {
                  "name": "version",
                  "type": "string"
                },
                {
                  "name": "platform",
                  "type": "string"
                }
              ]
            }
          },
          {
            "name": "containerized",
            "type": "boolean"
          },
          {
            "name": "ip",
            "type": { "type": "array", "name": "iparray", "items": "string" }
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "id",
            "type": "string"
          },
          {
            "name": "architecture",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "message",
      "type": "string"
    },
    {
      "name": "tags",
      "type": { "type": "array", "name": "tagsarray", "items": "string" }
    }
  ]
}
4

0 回答 0