0

I have following AVRO message in Kafka topic.

{
"table": {
    "string": "Schema.xDEAL"
},
"op_type": {
    "string": "Insert"
},
"op_ts": {
    "string": "2018-03-16 09:03:25.000462"
},
"current_ts": {
    "string": "2018-03-16 10:03:37.778000"
},
"pos": {
    "string": "00000000000000010722"
},
"before": null,
"after": {
    "row": {
        "DEA_PID_DEAL": {
            "string": "AAAAAAAA"
        },
        "DEA_NME_DEAL": {
            "string": "MY OGG DEAL"
        },
        "DEA_NME_ALIAS_NAME": {
            "string": "MY OGG DEAL"
        },
        "DEA_NUM_DEAL_CNTL": {
            "string": "4swb6zs4"
        }           
    }
}

}

When I run the following query. It creates the stream with null values.

   CREATE STREAM tls_deal (DEA_PID_DEAL VARCHAR, DEA_NME_DEAL varchar, DEA_NME_ALIAS_NAME VARCHAR, DEA_NUM_DEAL_CNTL VARCHAR) WITH (kafka_topic='deal-ogg-topic',value_format='AVRO', key = 'DEA_PID_DEAL');

But when I change the AVRO message to following it works.

 {
"table": {
    "string": "Schema.xDEAL"
},
"op_type": {
    "string": "Insert"
},
"op_ts": {
    "string": "2018-03-16 09:03:25.000462"
},
"current_ts": {
    "string": "2018-03-16 10:03:37.778000"
},
"pos": {
    "string": "00000000000000010722"
},
"DEA_PID_DEAL": {
    "string": "AAAAAAAA"
},
"DEA_NME_DEAL": {
    "string": "MY OGG DEAL"
},
"DEA_NME_ALIAS_NAME": {
    "string": "MY OGG DEAL"
},
"DEA_NUM_DEAL_CNTL": {
    "string": "4swb6zs4"
}           

}

Now If I run the above query the data will be populated.

My question is If I need to populate stream from nested field how can I handle this?

I am not able to find the solution in KSQL documentation page.

Thanks in advance. I appreciate the help. :)

4

2 回答 2

2

正如 Robin 所说,目前不支持此功能(2018 年 3 月 22 日/v0.5)。但是,这是一个跟踪的功能请求。您可能想在 KSQL 存储库中投票或跟踪这个 Github 问题:

https://github.com/confluentinc/ksql/issues/638

于 2018-03-23T10:14:20.560 回答
1

KSQL 目前(2018 年 3 月 22 日/v0.5)不支持嵌套 Avro。您可以使用单消息转换来扁平化来自 Kafka Connect 的数据。例如,Debezium 附带UnwrapFromEnvelope.

于 2018-03-22T16:35:16.307 回答