1

我正在尝试将数据从一种格式转换为另一种格式(一种模式到另一种模式)。

例子 :

payload = {
    'a' : 'a1',
    'b' : 'b1'
}

我想把这个有效载荷转换成另一种形式让我们说

payload_transform = {
   'a':{
      'b' : 'b1'
    }
    'c' : 'a1'
}

考虑到 data( payload) 来自 Kafka,我想payload_transform在消费者中看到通过转换

可以用 ksql 吗?

更新 :

我们可以做一个级别:

payload = {
    'a' : 'a1',
    'b' : 'b1'
}

payload = {
    'confluent' : 'a1',
    'b' : 'b1'
}

我们可以添加条件吗?

例如:如果有效载荷中存在“b”键生成

payload = {
        'confluent' : 'a1',
        'b' : 'b1'
    }

否则 :

payload = {
        'kafka' : 'a1',
        'b' : 'b1'
    }
4

1 回答 1

3

虽然 KSQL 确实支持取消嵌套 JSON(带有EXTRACTJSONFIELD),但它目前(2018 年 3 月/版本 0.5)不支持构建嵌套结构。它目前也不支持嵌套的 Avro。

更新了对更新问题的回复

  • 您可以重命名字段,只需使用 SQLAS子句:

    SELECT A AS NEW_COL, B FROM INPUT_STREAM

你能描述更多关于你在这里尝试做的事情吗?在您给出的示例中,有条件地重命名字段没有意义。也许也可以尝试一下 KSQL,看看什么对您有用。

于 2018-03-21T09:04:53.243 回答