使用 KSQL-CLI,带有消息是 JSON 对象的 kafka 主题。我希望在不声明详尽的 STRUCT 或 MAP 字段定义的情况下提取诸如 obj.updaterId 之类的字段。
{ "id": "5ba8f7e6b93c7964efb00f48", "source": "ShareService", "obj": { "updaterId": "systems@test.com", "desc": "foobar", "name": "com.test.auto.sensor" } }
我可以通过多种方式成功创建 Stream,最简单的是:
CREATE STREAM objs1 (obj VARCHAR) WITH (kafka_topic='json-topic', value_format='JSON');
简单的选择按预期工作,您可以看到 obj 的内容...
SELECT * FROM objs1;
1537804190394 | "5ba8f7e6b93c7964efb00f48" | {name=com.test.auto.sensor, updaterId=systems@test.com, desc=foobar}
在这里不起作用的是使用 EXTRACTJSONFIELD 从 obj 中提取 JSON 字段的任何尝试。对顶级对象和嵌套对象的响应都是“null”。
SELECT EXTRACTJSONFIELD(obj, '$.updaterId') AS updater FROM objs1;
无效的
ksql 文档中有一条注释说,如果数据是 STRING 列中的实际对象,我可以使用 STRUCT 代替。它并没有说我必须使用 STRUCT。
顺便说一句,使用 STRUCT 确实有效,但我对 EXTRACTJSONFIELD 感兴趣,因为我的消息的深层结构会有所不同。换句话说,如果消息不包含深层结构,有时会出现空响应。
作品:
CREATE STREAM objs1 (obj STRUCT<updaterId VARCHAR>) WITH (kafka_topic='json-topic', value_format='JSON');
SELECT OBJ->updaterId AS updater FROM OBJS1;
我发誓我在其他人的问题中看到了其他似乎适用于类似安排的示例。我错过了什么?
注意:我为这篇文章简化了我的 JSON。它更大,更嵌套的 IRL,但我相信这个更简单的例子是准确的。
OSX 上的 KSQL 版本 5.0.0。