0

使用 KSQL-CLI,带有消息是 JSON 对象的 kafka 主题。我希望在不声明详尽的 STRUCT 或 MAP 字段定义的情况下提取诸如 obj.updaterId 之类的字段。

{
  "id": "5ba8f7e6b93c7964efb00f48",
  "source": "ShareService",
  "obj": {
    "updaterId": "systems@test.com",
    "desc": "foobar",
    "name": "com.test.auto.sensor"
  }
}

我可以通过多种方式成功创建 Stream,最简单的是:

CREATE STREAM objs1 (obj VARCHAR) WITH (kafka_topic='json-topic', value_format='JSON');

简单的选择按预期工作,您可以看到 obj 的内容...

SELECT *  FROM objs1;

1537804190394 | "5ba8f7e6b93c7964efb00f48" | {name=com.test.auto.sensor, updaterId=systems@test.com, desc=foobar}

在这里不起作用的是使用 EXTRACTJSONFIELD 从 obj 中提取 JSON 字段的任何尝试。对顶级对象和嵌套对象的响应都是“null”。

SELECT EXTRACTJSONFIELD(obj, '$.updaterId') AS updater FROM objs1;

无效的

ksql 文档中有一条注释说,如果数据是 STRING 列中的实际对象,我可以使用 STRUCT 代替。它并没有说我必须使用 STRUCT。

顺便说一句,使用 STRUCT 确实有效,但我对 EXTRACTJSONFIELD 感兴趣,因为我的消息的深层结构会有所不同。换句话说,如果消息不包含深层结构,有时会出现空响应。

作品:

CREATE STREAM objs1 (obj STRUCT<updaterId VARCHAR>) WITH (kafka_topic='json-topic', value_format='JSON');
SELECT OBJ->updaterId AS updater FROM OBJS1;

我发誓我在其他人的问题中看到了其他似乎适用于类似安排的示例。我错过了什么?

注意:我为这篇文章简化了我的 JSON。它更大,更嵌套的 IRL,但我相信这个更简单的例子是准确的。

OSX 上的 KSQL 版本 5.0.0。

4

2 回答 2

1

看起来 JSON 解析器ksql 问题 #1562在最新版本中发生了重大变化,它从 VARCHAR 列中的 JSON 内容中去除引号(从我的示例中可以看到),这导致 JSON 解析器无法找到该字段名称。

该问题建议使用 STRUCT 而不是 EXTRACTJSONFIELD(与我上面的工作示例一样)。

这似乎不适合我的用例,因为我的深层字段名称可能会改变。将对此进行更多研究并更新。

于 2018-09-26T19:03:24.387 回答
1

ksql 问题 #1562已解决,您应该能够EXTRACTJSONFIELD像以前一样使用函数。请注意,您现在需要使用 master 的最新版本才能拥有此功能。

于 2018-09-27T22:36:43.413 回答