问题标签 [ksqldb]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 如何转换来自 KSQL 的响应 - UDF 返回 JSON 数组到列
我有一个名为 getCityStats(string city, double distance) 的自定义 UDF,它接受 2 个参数并返回一个 JSON 字符串数组(对象),如下所示
我想在 KSQL 表创建查询中将它们处理为
换句话说,KSQL 是否可以处理元组类型,其中可以处理 Json 字符串数组以在表创建查询中返回如上所示?
apache-kafka - KSQL:左连接不等于条件不满足结果
请找到以下问题并进行确认。
Step-01:根据加入条件,从表中获取值并填充。由于表 B 中没有匹配值,所有列都填充了 NULL 值。
列:B.OP_TYPE,B.DEMO_ID
与 where 条件相同的选择查询给出与预期相同的结果。
但是当我们尝试使用 Not Equal where condition 进行选择时,相应的查询没有给出正确的结果。B.OP_TYPE != 'D' - 这是包含 B.OP_TYPE 的条件为空的地方
apache-kafka - 带有 JSON 格式消息的 KSQL EXTRACTJSONFIELD 返回 null
使用 KSQL-CLI,带有消息是 JSON 对象的 kafka 主题。我希望在不声明详尽的 STRUCT 或 MAP 字段定义的情况下提取诸如 obj.updaterId 之类的字段。
我可以通过多种方式成功创建 Stream,最简单的是:
简单的选择按预期工作,您可以看到 obj 的内容...
1537804190394 | "5ba8f7e6b93c7964efb00f48" | {name=com.test.auto.sensor, updaterId=systems@test.com, desc=foobar}
在这里不起作用的是使用 EXTRACTJSONFIELD 从 obj 中提取 JSON 字段的任何尝试。对顶级对象和嵌套对象的响应都是“null”。
无效的
ksql 文档中有一条注释说,如果数据是 STRING 列中的实际对象,我可以使用 STRUCT 代替。它并没有说我必须使用 STRUCT。
顺便说一句,使用 STRUCT 确实有效,但我对 EXTRACTJSONFIELD 感兴趣,因为我的消息的深层结构会有所不同。换句话说,如果消息不包含深层结构,有时会出现空响应。
作品:
我发誓我在其他人的问题中看到了其他似乎适用于类似安排的示例。我错过了什么?
注意:我为这篇文章简化了我的 JSON。它更大,更嵌套的 IRL,但我相信这个更简单的例子是准确的。
OSX 上的 KSQL 版本 5.0.0。
apache-kafka - KSQL 表获取新旧值
在 KSQL 中是否可以从表中流出新旧值?我们想使用一个表作为值的存储,当一个表发生变化时,会输出一个“反转”值,它是前一个值,以某种方式标记,以及新值,这样我们就可以处理下游系统中的增量?
apache-kafka - KSQL - 在 WINDOW TUMBLING 子句中更改时区
这是我的 KSQL usingWINDOW TUMBLING
子句:
一些结果:
到日期时间的纪元毫秒是:
正如我们所看到的,我在 UTC+8。但无论时区如何,start
日期时间不应2018-09-29 00:00:00
早于 8 小时。那么它能够改变时区吗?
PS:我尝试了几个窗口大小,2018-09-30 11:33:00
我完全失去了..
apache-kafka - 如何从kafka的主题中创建具有大量JSON字段的KSQL Stream?
我将一个长 JSON 字符串传递给 kafka 主题,例如:
并希望从 kafka 主题创建包含所有字段的流,而不指定 KSQL 中的每个字段,例如:
apache-kafka - kafka ksql 提取 json 字段文字美元符号
我有一个来自 mongo CDC 连接器的数据流,但问题是流键是 JSON 字符串的形式。
例如
{"id":"{ \"$oid\" : \"5bbb0c70cd0b9c06cf06c9c1\"}"}
我知道我可以使用 extractjsonfield 方法使用 jsonpath 提取数据,但是,我不知道如何提取我尝试过的文字美元符号:
$.id.$oid
$.id[\$oid]
$.id.*
每次我得到一个空响应,有什么想法吗?
apache-kafka - 在kafka中如何转换表格中的主题?我需要复制远程表
我确实配置了与数据库的连接并通过主题传输所有数据,因为当我运行消费者时它返回数据
如何将此主题转换为表格并将数据保存在 KSQL 中?
非常感谢
apache-kafka - Kafka查询流
我有一个业务需求,我需要有 12 小时的窗口,并且需要查询流数据。12 小时内大约有 1 亿条记录。我还需要维护所有事件的顺序。我使用 Streams API 构建了一个系统来执行此操作。音量似乎不是问题。真正的问题是企业想要搜索事件和状态商店,几乎每个状态商店。搜索不是基于键,而是基于值中的某些字段。
我尝试了 KSQL 服务器并尝试使用 25M 记录的数据集运行简单查询,并且在 8 小时的窗口内运行查询需要将近 240 秒才能完成搜索。(现在我使用的是单个节点和单个分区。)
我正在考虑的另一种方法是将 Elastic Search 连接到流和状态存储,然后对它们运行查询,但我不确定存储每个状态存储的数据是否是一个好的解决方案。
我只是想从社区那里得到意见,什么是查询具有这种容量和低响应时间要求的流的最佳方法。
我还是 Kafka 的新手,期待建议和指导。
apache-kafka - KSQL 如何从已经包含一些消息的主题中读取(使用 msgs)数据
我已经使用 KSQL 从 Kafka 创建了一个流,用于已经包含一些消息的 Kafka 主题。但没有从该主题接收任何消息到创建的流中。消息采用 Avro 格式,并在一段时间后生成。
我想从最早的味精开始阅读。还尝试将偏移属性设置为最早但未收到任何消息。
create stream sample_transition with(topic_name='transition',value_format='avro');