问题标签 [ksqldb]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka-streams - Kafka Streams 窗口化如何工作?
我很难理解 Kafka Streams 中的 Windowing 是如何工作的。结果似乎与我迄今为止阅读和理解的内容不一致。
我创建了一个带有支持主题的 KSQL 流。KSQL SELECT 语句中的“列”之一已被指定为主题的 TIMESTAMP。
my-stream-topic 中的记录按键 (PARTITION_KEY) 分组,并使用跳跃窗口进行窗口化
记录通过以下方式汇总
然后我通过打印到控制台
组中的第一个窗口转换为 7:00 - 7:05
当我通过控制台消费者检查 my-stream-topic 中的记录时,我看到有 2 条记录应该落在上述窗口中。但是,只有其中 1 个被聚合器拾取。
我认为 dataAgg 窗口化 KTable 将包含 1 条记录作为分组键,但聚合将使用 2 条记录来计算聚合。打印的合计值不正确。
我错过了什么?
apache-kafka - 没有从 ksql 查询中得到结果
我有一个本地运行的 kafka 集群和一个名为“my-topic”的主题,我在其中推送数据。我还运行了 ksql 服务器并且查询: SELECT*FROM "my-topic" 给了我"my-topic 不存在"。我知道这个查询是不正确的,我想知道是否有另一种查询主题的方法。
apache-kafka - ksql,在表上选择不显示任何内容
我创建了一个源主题订阅者有这样的输入消息:
然后我在上面创建了一个流和一个表:
我尝试使用 ksql 进行测试:
(当我将新的 json 放入订阅者主题时打印结果)
(当我将新的 json 放入订阅者主题时没有显示任何内容)
所以请为我澄清这个案子有什么问题?
非常感谢。
apache-kafka - KSQL:如何更改 DELIMITED FORMAT 的分隔符(逗号)?
我尝试将大量消息(350M)放入客户主题(源主题),其值格式如下
然后我就该主题制作了一些流和表格,但 ksql 支持的分隔格式只是逗号分隔符。我有一些问题:
- 有什么方法可以配置 ksql 可以理解我的格式吗?或者我必须通过 ksql 转换为默认格式(逗号分隔符)
- 从上面源主题的原始值,这个命令如何将值映射到表列?还是我必须将格式转换为 json?CREATE STREAM (sub_id BIGINT, contract_id BIGINT, cust_id BIGINT, account_id BIGINT,telecom_service_id BIGINT, isdn BIGINT, imsi BIGINT) \ WITH (KAFKA_TOPIC='customer', VALUE_FORMAT='DELIMITED');
谢谢。
apache-kafka - Ksql, GROUP BY 返回 ServerError:java.lang.NullPointerException
我正在使用 confluent 4.1.1 ,我尝试了几个聚合命令,但总是使用 GROUP BY 返回 NPE:
请帮我!
list - ksql中的可变长度列表
在 KSQL 中,可以EXTRACTJSONFIELD
用于嵌套结构,但我不知道如何处理可变长度列表。例如:
我可以quux
作为基本流的 varchar 处理,
但我希望能够把它变成一张桌子:
如何处理 KSQL 中的可变长度列表?
java - 在kafka流中用另一个替换json对象
我正在研究 Kafka Streams。我面临以下问题:
到目前为止我所做的详细信息:
我创建了以下主题、流和表格:
为上述创建的主题创建表和流。
我可以看到如下数据:
通过加入上面创建的表和流来创建另一个流。
我可以看到 CUST_ADDR_SRC 流中的数据,如下所示:
我的问题:
- 现在我想用 addressId 1(Detroit) 替换 addressId 1(Fremont)。我怎样才能做到这一点?
- 如票证中所述,我还尝试将流输入打印到控制台
这是我的代码:
我没有看到输出。
只有,我可以看到以下输出:
12:04:42.145 [StreamThread-1] 调试 org.apache.kafka.clients.consumer.internals.Fetcher - 将分区 cust_addr_src-0 的偏移量重置为最新偏移量。12:04:42.145 [StreamThread-1] 调试 org.apache.kafka.clients.NetworkClient - 在 hsharma-mbp15.local:9092 处启动到节点 0 的连接。12:04:42.145 [StreamThread-1] 调试 org.apache.kafka.common.metrics.Metrics - 添加了名为 node-0.bytes-sent 的传感器 12:04:42.145 [StreamThread-1] 调试 org.apache.kafka .common.metrics.Metrics - 添加了名为 node-0.bytes-received 12:04:42.145 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics 的传感器 - 添加了名为 node-0.latency 的传感器12:04:42.145 [StreamThread-1] 调试 org.apache.kafka.clients.NetworkClient - 已完成与节点 0 的连接 12:04:42.145 [StreamThread-1] 调试 org.apache.kafka.clients.consumer.internals。
提前致谢。
apache-kafka - 键名中的 KSQL EXTRACTJSONFIELD 空格
在 KSQL 中使用 EXTRACTJSONFIELD 时,我遇到了名称中的空格问题例如:
-- 这总是返回 NULL
我应该如何处理键名中的空格?
我努力了:
apache-kafka - 使用 Kafka KSQL 从具有给定偏移量的特定分区中选择主题的所有事件
问题:我在外部数据库中有一个表,其中包含我上次从 Kafka Bus 轮询的 kafka 事件。该表包含所有事件的复合主键 PK(主题、分区、偏移量)。
所以我可以很容易地为每个主题和分区确定最新的事件。
现在我很想做这样的选择:
当然,我希望该语句立即返回队列中当前的所有事件,并将结果写入 HDFS 文件。
我将如何使用 KSQL 做到这一点?
注意:当然,我希望将所有分区及其相应的偏移量成对放入一个数组中,并在 where 子句中使用它......这将是一个优质的解决方案。
apache-kafka - KSQL 联接和时间
我需要在“登录失败”后的 10 分钟内识别出“成功登录”的客户我创建了一个“登录失败”的 Windowed KSQL 表
我创建了一个“成功登录”流
KSQL 不允许我加入带有“窗口”表的流
这种情况有解决方法吗?