问题标签 [ksqldb]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - confluent-4.1.1 是否支持带有 ksql cli 的嵌套 avro?
我正在使用 ksql cli 处理 confluent-4.1.1。我可以在此版本中为嵌套 avro 数据格式创建流吗,因为我尝试使用版本 5 并且它工作正常,但在 confluent-4.1.1 中提到的嵌套 avro 模式中没有找到有用的链接
我试图创建流 - 创建流 new_order with(kafka_topic='transition',value_format='avro');
出现错误 -
无法从架构注册表中获取 AVRO 架构。找不到 avro 类型的正确类型:transition.Value
apache-kafka - 查找最后 5 分钟的页面浏览量
我有一个名为 page_views 的 kafka 主题和一个名为 pageviews 的流。现在我想计算最近 5 分钟查看的页面。我正在使用 ksql。试过了
和
但不工作。这是嵌套的 avro 模式。
group-by - 在confluent kafka ksql中按column_name进行分组时名称为空错误
我在 confluent-5.0.0 中遇到错误。
ksql>CREATE TABLE order_per_hour AS SELECT after->order_id,count(*) FROM transaction WINDOW SESSION(60 seconds) GROUP BY after->order_id;
名称为空
错误名称为空
after 是架构中的结构字段。没有分组依据的简单选择查询工作正常。
apache-kafka - 如何在 Kafka 或 KSQL 中修改或添加主题的键
我创建了很多没有密钥的主题,我如何修改它们并添加正确的主题?
我需要为一些希望它们正确阅读主题的连接器更改此设置
我个人使用 ksql 但我没有找到任何方法
mysql - 如何将 MySql 表数据集成到 Ksql 流或表?
我正在尝试构建从MySql到Ksql的数据管道。
用例:数据源是MySql。我在 MySql 中创建了一个表。
我在用
启动一个独立的连接器。它工作正常。
我以主题名称开始消费者,即
当我在 MySQL 表中插入数据时,我也会在消费者中得到结果。我已经创建了具有相同主题名称的 KSQL Stream。我也希望在我的 Kstream 中得到相同的结果,但是当我这样做时我没有得到任何结果
连接器配置--source-quickstart-mysql.properties
样本数据
- mysql
1.) 创建数据库:
2.) 使用数据库:
3.)创建表:
4.) 将数据插入表中:
- KSQL
1.)创建流:
2.) 选择查询:
预期产出
- 消费者
获取插入到 MySQL 表中的数据。
在这里,我在 MySQL 中获取数据。
- 数据库
应填充插入到 Mysql 表中并反映在 Kafka 主题中的 In-Stream 数据。
我在 ksql 中没有得到预期的结果
帮我处理这个数据管道。
apache-kafka - 你可以从远程主机运行 KSQL 吗?
我在集群的一个节点上运行了 confluent-ksql-server 。我们可以让 ksql 由 kafka 集群外的特定主机/机器连接吗?
PS-这是为开发人员提供ksql访问
谢谢 !
apache-kafka - 如何使用复合键从主题创建 KSQL 表?
假设我有一个温度预测数据的主题,如下:
每个条目都包含一个日期、一个城市和一个预测温度,并表示该城市在该日期的预测更新。我可以将其描述为这样的 KSQL 流:
现在,我想要一个表格来代表每个城市的当前(即最新)预测温度,以及该预测随时间变化的最小值和最大值。所需输出的示例是:
我怎样才能做到这一点?
我设法得到聚合(最小/最大)如下:
这给了我输出消息,如:
但我不知道如何将它与“最新”阅读结合起来。
apache-kafka - 有没有办法将时间戳添加到 KSQL 流选择?
我有一些使用 ksql 流的消息处理,我想为每个处理的行添加时间戳,以检索完成处理的时间。
最初我假设 ROWTIME 最终会得到更新,但它似乎是从最初的 Kafka 主题的消息中保留下来的。
TIMESTAMP 方法似乎用于将输入数据转换为某些特定的时间戳。
apache-kafka - kafka 流到 ktable 连接
我正在尝试加入
- KStream:从一个主题创建,该主题具有 JSON 值。我使用来自该值的两个属性重新键入流。示例值(json 的片段)。我创建了一个自定义 pojo 类并使用自定义 serdes。
{"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}
键映射为:
map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))
我查看了 KStream 并打印了我使用的键和属性。看起来一切都很好。
- KTable:我从一个主题创建一个 ktable,我正在使用 python 脚本写入主题,主题的关键是
KYZ1ifHCInOctets
设备名称和指标名称的组合(从上面)。我做了一个 toStream,然后查看生成的流。键和值似乎都很好。
现在,当我进行内部连接并查看或通过/查看某个主题时,我看到键和值不匹配。加入好像不行
我通过 ksql 有完全相同的事情,但想做我自己的流应用程序。
apache-kafka - ksql - 是否可以从多个主题创建流并获得完整的事件有效负载?
我们需要监听多个主题,并在每个主题的事件中寻找特定的字段。每个主题事件都是 json 格式,并且保证很少有 json 格式的固定字段。需要从所有这些多个主题中过滤事件,并在每个事件负载中查找特定字段。如果此字段值匹配某种格式,则将这些事件从不同主题发送到一个固定主题,该主题可以由另一个消费者进一步处理。
正在寻找 ksql 是否可以在这种情况下提供帮助 - 我们从多个主题创建一个流,并根据 ksql 流中的固定列过滤数据并将其推送到新主题。我的问题是:1)是否可以从多个主题创建一个 ksql 流?2) 是否可以将主题的完整事件有效负载作为 ksql 流中的一列?
在高层次上,(使用错误的 ksql 语法),我正在寻找类似的东西