问题标签 [ksqldb]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
301 浏览

apache-kafka - 如何转换来自 KSQL 的响应 - UDF 返回 JSON 数组到列

我有一个名为 getCityStats(string city, double distance) 的自定义 UDF,它接受 2 个参数并返回一个 JSON 字符串数组(对象),如下所示

我想在 KSQL 表创建查询中将它们处理为

换句话说,KSQL 是否可以处理元组类型,其中可以处理 Json 字符串数组以在表创建查询中返回如上所示?

0 投票
2 回答
650 浏览

apache-kafka - KSQL:左连接不等于条件不满足结果

请找到以下问题并进行确认。

Step-01:根据加入条件,从表中获取值并填充。由于表 B 中没有匹配值,所有列都填充了 NULL 值。

列:B.OP_TYPE,B.DEMO_ID

与 where 条件相同的选择查询给出与预期相同的结果。

但是当我们尝试使用 Not Equal where condition 进行选择时,相应的查询没有给出正确的结果。B.OP_TYPE != 'D' - 这是包含 B.OP_TYPE 的条件为空的地方

0 投票
2 回答
1749 浏览

apache-kafka - 带有 JSON 格式消息的 KSQL EXTRACTJSONFIELD 返回 null

使用 KSQL-CLI,带有消息是 JSON 对象的 kafka 主题。我希望在不声明详尽的 STRUCT 或 MAP 字段定义的情况下提取诸如 obj.updaterId 之类的字段。

我可以通过多种方式成功创建 Stream,最简单的是:

简单的选择按预期工作,您可以看到 obj 的内容...

1537804190394 | "5ba8f7e6b93c7964efb00f48" | {name=com.test.auto.sensor, updaterId=systems@test.com, desc=foobar}

在这里不起作用的是使用 EXTRACTJSONFIELD 从 obj 中提取 JSON 字段的任何尝试。对顶级对象和嵌套对象的响应都是“null”。

无效的

ksql 文档中有一条注释说,如果数据是 STRING 列中的实际对象,我可以使用 STRUCT 代替。它并没有说我必须使用 STRUCT。

顺便说一句,使用 STRUCT 确实有效,但我对 EXTRACTJSONFIELD 感兴趣,因为我的消息的深层结构会有所不同。换句话说,如果消息不包含深层结构,有时会出现空响应。

作品:

我发誓我在其他人的问题中看到了其他似乎适用于类似安排的示例。我错过了什么?

注意:我为这篇文章简化了我的 JSON。它更大,更嵌套的 IRL,但我相信这个更简单的例子是准确的。

OSX 上的 KSQL 版本 5.0.0。

0 投票
2 回答
402 浏览

apache-kafka - KSQL 表获取新旧值

在 KSQL 中是否可以从表中流出新旧值?我们想使用一个表作为值的存储,当一个表发生变化时,会输出一个“反转”值,它是前一个值,以某种方式标记,以及新值,这样我们就可以处理下游系统中的增量?

0 投票
2 回答
693 浏览

apache-kafka - KSQL - 在 WINDOW TUMBLING 子句中更改时区

这是我的 KSQL usingWINDOW TUMBLING子句:

一些结果:

到日期时间的纪元毫秒是:

正如我们所看到的,我在 UTC+8。但无论时区如何,start日期时间不应2018-09-29 00:00:00早于 8 小时。那么它能够改变时区吗?

PS:我尝试了几个窗口大小,2018-09-30 11:33:00我完全失去了..

0 投票
1 回答
1248 浏览

apache-kafka - 如何从kafka的主题中创建具有大量JSON字段的KSQL Stream?

我将一个长 JSON 字符串传递给 kafka 主题,例如:

并希望从 kafka 主题创建包含所有字段的流,而不指定 KSQL 中的每个字段,例如:

0 投票
2 回答
123 浏览

apache-kafka - kafka ksql 提取 json 字段文字美元符号

我有一个来自 mongo CDC 连接器的数据流,但问题是流键是 JSON 字符串的形式。

例如 {"id":"{ \"$oid\" : \"5bbb0c70cd0b9c06cf06c9c1\"}"}

我知道我可以使用 extractjsonfield 方法使用 jsonpath 提取数据,但是,我不知道如何提取我尝试过的文字美元符号: $.id.$oid $.id[\$oid] $.id.*

每次我得到一个空响应,有什么想法吗?

0 投票
1 回答
755 浏览

apache-kafka - 在kafka中如何转换表格中的主题?我需要复制远程表

我确实配置了与数据库的连接并通过主题传输所有数据,因为当我运行消费者时它返回数据

如何将此主题转换为表格并将数据保存在 KSQL 中?

非常感谢

0 投票
1 回答
528 浏览

apache-kafka - Kafka查询流

我有一个业务需求,我需要有 12 小时的窗口,并且需要查询流数据。12 小时内大约有 1 亿条记录。我还需要维护所有事件的顺序。我使用 Streams API 构建了一个系统来执行此操作。音量似乎不是问题。真正的问题是企业想要搜索事件和状态商店,几乎每个状态商店。搜索不是基于键,而是基于值中的某些字段。

我尝试了 KSQL 服务器并尝试使用 25M 记录的数据集运行简单查询,并且在 8 小时的窗口内运行查询需要将近 240 秒才能完成搜索。(现在我使用的是单个节点和单个分区。)

我正在考虑的另一种方法是将 Elastic Search 连接到流和状态存储,然后对它们运行查询,但我不确定存储每个状态存储的数据是否是一个好的解决方案。

我只是想从社区那里得到意见,什么是查询具有这种容量和低响应时间要求的流的最佳方法。

我还是 Kafka 的新手,期待建议和指导。

0 投票
1 回答
1439 浏览

apache-kafka - KSQL 如何从已经包含一些消息的主题中读取(使用 msgs)数据

我已经使用 KSQL 从 Kafka 创建了一个流,用于已经包含一些消息的 Kafka 主题。但没有从该主题接收任何消息到创建的流中。消息采用 Avro 格式,并在一段时间后生成。

我想从最早的味精开始阅读。还尝试将偏移属性设置为最早但未收到任何消息。

create stream sample_transition with(topic_name='transition',value_format='avro');