5

我将一个长 JSON 字符串传递给 kafka 主题,例如:

{
    "glossary": {
        "title": "example glossary",
        "GlossDiv": {
            "title": "S",
            "GlossList": {
                "GlossEntry": {
                    "ID": "SGML",
                    "SortAs": "SGML",
                    "GlossTerm": "Standard Generalized Markup Language",
                    "Acronym": "SGML",
                    "Abbrev": "ISO 8879:1986",
                    "GlossDef": {
                        "para": "A meta-markup language, used to create markup languages such as DocBook.",
                        "GlossSeeAlso": ["GML", "XML"]
                    },
                    "GlossSee": "markup"
                }
            }
        }
    }
}

并希望从 kafka 主题创建包含所有字段的流,而不指定 KSQL 中的每个字段,例如:

 CREATE STREAM pageviews_original (*) WITH \
(kafka_topic='pageviews', value_format='JSON');
4

1 回答 1

7

如果您希望 KSQL 自动拾取字段名称,则需要使用 Avro。如果您使用 Avro,则数据的架构会在 Confluent Schema Registry 中注册,当您使用主题时,KSQL 会自动检索它。

如果你使用 JSON,你必须告诉 KSQL 列是什么。您可以在CREATE STREAM语句中使用STRUCT嵌套元素的数据类型来执行此操作。

您可以通过仅在中声明高级字段CREATE STREAM然后使用EXTRACTJSONFIELD您要使用的字段访问嵌套元素来列出所有字段。请注意,5.0.0 中存在问题,该问题将在 5.0.1 中修复。此外,您不能将其用于您显示的示例数据中的嵌套数组等。

于 2018-10-02T09:40:43.163 回答