问题标签 [ksqldb]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
85 浏览

apache-kafka - confluent-4.1.1 是否支持带有 ksql cli 的嵌套 avro?

我正在使用 ksql cli 处理 confluent-4.1.1。我可以在此版本中为嵌套 avro 数据格式创建流吗,因为我尝试使用版本 5 并且它工作正常,但在 confluent-4.1.1 中提到的嵌套 avro 模式中没有找到有用的链接

我试图创建流 - 创建流 new_order with(kafka_topic='transition',value_format='avro');

出现错误 -

无法从架构注册表中获取 AVRO 架构。找不到 avro 类型的正确类型:transition.Value

0 投票
1 回答
182 浏览

apache-kafka - 查找最后 5 分钟的页面浏览量

我有一个名为 page_views 的 kafka 主题和一个名为 pageviews 的流。现在我想计算最近 5 分钟查看的页面。我正在使用 ksql。试过了

但不工作。这是嵌套的 avro 模式。

0 投票
2 回答
424 浏览

group-by - 在confluent kafka ksql中按column_name进行分组时名称为空错误

我在 confluent-5.0.0 中遇到错误。

ksql>CREATE TABLE order_per_hour AS SELECT after->order_id,count(*) FROM transaction WINDOW SESSION(60 seconds) GROUP BY after->order_id;

名称为空

错误名称为空

after 是架构中的结构字段。没有分组依据的简单选择查询工作正常。

0 投票
1 回答
834 浏览

apache-kafka - 如何在 Kafka 或 KSQL 中修改或添加主题的键

我创建了很多没有密钥的主题,我如何修改它们并添加正确的主题?

我需要为一些希望它们正确阅读主题的连接器更改此设置

我个人使用 ksql 但我没有找到任何方法

0 投票
1 回答
1009 浏览

mysql - 如何将 MySql 表数据集成到 Ksql 流或表?

我正在尝试构建从MySqlKsql的数据管道。

用例:数据源是MySql。我在 MySql 中创建了一个表。

我在用

启动一个独立的连接器。它工作正常。

我以主题名称开始消费者,即

当我在 MySQL 表中插入数据时,我也会在消费者中得到结果。我已经创建了具有相同主题名称的 KSQL Stream。我也希望在我的 Kstream 中得到相同的结果,但是当我这样做时我没有得到任何结果

连接器配置--source-quickstart-mysql.properties

样本数据

  • mysql

1.) 创建数据库:

2.) 使用数据库:

3.)创建表:

4.) 将数据插入表中:

  • KSQL

1.)创建流:

2.) 选择查询:

预期产出

  1. 消费者

获取插入到 MySQL 表中的数据。

在这里,我在 MySQL 中获取数据。

  1. 数据库

应填充插入到 Mysql 表中并反映在 Kafka 主题中的 In-Stream 数据。

我在 ksql 中没有得到预期的结果

帮我处理这个数据管道。

0 投票
1 回答
627 浏览

apache-kafka - 你可以从远程主机运行 KSQL 吗?

我在集群的一个节点上运行了 confluent-ksql-server 。我们可以让 ksql 由 kafka 集群外的特定主机/机器连接吗?

PS-这是为开发人员提供ksql访问

谢谢 !

0 投票
1 回答
1573 浏览

apache-kafka - 如何使用复合键从主题创建 KSQL 表?

假设我有一个温度预测数据的主题,如下:

每个条目都包含一个日期、一个城市和一个预测温度,并表示该城市在该日期的预测更新我可以将其描述为这样的 KSQL 流:

现在,我想要一个表格来代表每个城市的当前(即最新)预测温度,以及该预测随时间变化的最小值和最大值。所需输出的示例是:

我怎样才能做到这一点?

我设法得到聚合(最小/最大)如下:

这给了我输出消息,如:

但我不知道如何将它与“最新”阅读结合起来。

0 投票
1 回答
828 浏览

apache-kafka - 有没有办法将时间戳添加到 KSQL 流选择?

我有一些使用 ksql 流的消息处理,我想为每个处理的行添加时间戳,以检索完成处理的时间。

最初我假设 ROWTIME 最终会得到更新,但它似乎是从最初的 Kafka 主题的消息中保留下来的。

TIMESTAMP 方法似乎用于将输入数据转换为某些特定的时间戳。

0 投票
1 回答
711 浏览

apache-kafka - kafka 流到 ktable 连接

我正在尝试加入

  • KStream:从一个主题创建,该主题具有 JSON 值。我使用来自该值的两个属性重新键入流。示例值(json 的片段)。我创建了一个自定义 pojo 类并使用自定义 serdes。{"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}

键映射为:

map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))

我查看了 KStream 并打印了我使用的键和属性。看起来一切都很好。

  • KTable:我从一个主题创建一个 ktable,我正在使用 python 脚本写入主题,主题的关键是KYZ1ifHCInOctets设备名称和指标名称的组合(从上面)。我做了一个 toStream,然后查看生成的流。键和值似乎都很好。

现在,当我进行内部连接并查看或通过/查看某个主题时,我看到键和值不匹配。加入好像不行

我通过 ksql 有完全相同的事情,但想做我自己的流应用程序。

0 投票
2 回答
1673 浏览

apache-kafka - ksql - 是否可以从多个主题创建流并获得完整的事件有效负载?

我们需要监听多个主题,并在每个主题的事件中寻找特定的字段。每个主题事件都是 json 格式,并且保证很少有 json 格式的固定字段。需要从所有这些多个主题中过滤事件,并在每个事件负载中查找特定字段。如果此字段值匹配某种格式,则将这些事件从不同主题发送到一个固定主题,该主题可以由另一个消费者进一步处理。

正在寻找 ksql 是否可以在这种情况下提供帮助 - 我们从多个主题创建一个流,并根据 ksql 流中的固定列过滤数据并将其推送到新主题。我的问题是:1)是否可以从多个主题创建一个 ksql 流?2) 是否可以将主题的完整事件有效负载作为 ksql 流中的一列?

在高层次上,(使用错误的 ksql 语法),我正在寻找类似的东西