问题标签 [ksqldb]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
918 浏览

apache-kafka - 尝试加入表和流时出错

我正在尝试加入一个表和一个流并创建另一个表,如下所示:

但报如下错误:

这是stream_fx2流和table_fx_latest3表的描述:

我猜这可能是 KSQL 的一个错误(仍在开发人员预览中),但我想确保我没有遗漏任何东西。任何帮助将非常感激。

0 投票
1 回答
1269 浏览

apache-kafka - KSQL 表未显示数据,但具有相同结构的 Stream 返回数据

我在 KSQL 中创建了一个表,而查询它没有返回任何数据。然后,我在同一主题上创建了一个具有相同结构的流,并且能够查询数据。

我在这里想念什么。我需要这个作为加入流的表。

提前致谢。

0 投票
1 回答
404 浏览

apache-kafka - Apache Kafka Stream / KSQL 如何处理乱序/延迟消息?

据我所知,kafka 流通过保留窗口处理延迟消息以容忍延迟,例如

  1. 但是我在KSQL中找不到相关的部分,还不支持?
  2. 我可以通过 kafka 流进行准确的统计而不给出最大容忍滞后(因为我不确定)吗?就像 Apache Flink sideOutputLateData专门处理延迟消息一样。
0 投票
3 回答
14010 浏览

apache-kafka - 如何加入多个Kafka主题?

所以我有...

  • 第一个具有通用应用程序日志 (log4j) 的主题。存储诸如 HTTP API 请求/响应和警告、异常等内容……可以有多个日志与一个逻辑业务请求相关联。(这些日志在几秒钟内发生)
  • 第二个主题包含来自上述业务请求的命令,其他服务对其采取行动。(这些命令也会在几秒钟内发生,但可能距离原始请求几分钟)
  • 第三个主题包含由其他服务的操作生成的事件。(大多数事件在几秒钟内完成,但有些可能需要 3-5 天才能收到)

因此,单个逻辑业务请求可以通过微服务相互传递的 uuid 关联多个日志、命令和事件。

那么有哪些技术/模式可用于阅读这 3 个主题并将它们全部连接为一个 json 文档,然后将它们转储到 Elasticsearch 中?

流媒体?

0 投票
1 回答
438 浏览

scala - 从 scala 启动 KsqlRestApplication 并获取 NoSuchMethodError org.apache.kafka.streams.StreamsConfig.getConsumerConfigs

我正在尝试编写一个程序,使我能够在 Scala 中对 Kafka 主题运行预定义的 KSQL 操作,但我不想每次都打开 KSQL Cli。因此,我想从我的 Scala 程序中启动 KSQL“服务器”。如果我正确理解了 KSQL 源代码,我必须构建并启动一个 KsqlRestApplication:

但是当我尝试这样做时,我收到以下错误:

我查看了 BrokerCompatibilityCheck 中的函数调用,并在创建函数中调用了 StreamsConfig.getConsumerConfigs() ,并使用 2 个字符串作为参数,而不是在

https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/StreamsConfig.html#getConsumerConfigs(StreamThread,%20java.lang.String,%20java.lang.String)

我的 KSQL 和 Kafka 版本根本不兼容还是我做错了什么?我正在使用 KSQL 版本 4.1.0-SNAPSHOT 和 Kafka 版本 1.0.0。

0 投票
1 回答
505 浏览

apache-kafka - KSQL 左连接不起作用

我是stackoverflow的新手,所以如果我有什么不对,请在这里发布这个问题。

我试图找到答案,但在网站上找不到与 KSQL JOIN 相关的问题,所以我发布了这个。我尝试了不同的方法来运行这个查询,但我一直得到空指针异常,所以在这里发布。

我有 2 个 kafka avro 主题交易和费用,但数据有很多空格来清除我创建了以下主题和带有修剪数据的表格。DEAL_STREAMEXPENSE_TABLE

结果:

结果:

当我执行以下查询时,它给了我空指针异常。我尝试了以下查询。

1:

2:

3:

错误:

0 投票
2 回答
427 浏览

apache-kafka - 在 docker 中运行 ksql 的问题

我在 Kubernetes 集群的容器中运行了 confluent kafka、zookeeper、schema-registry 和 ksql。Kafka、zookeeper 和模式注册表工作正常,可以创建主题并以 Avro 格式写入数据,但是当我尝试检查 ksql 并使用 curl 创建一些流时,例如:

我得到错误:

请在下面找到我的 ksql 服务器配置:

我也尝试在没有 schema.registry 字符串的情况下启动服务器,但没有运气

0 投票
1 回答
361 浏览

apache-kafka - 在 ksql 中转换数据

我正在尝试将数据从一种格式转换为另一种格式(一种模式到另一种模式)。

例子 :

我想把这个有效载荷转换成另一种形式让我们说

考虑到 data( payload) 来自 Kafka,我想payload_transform在消费者中看到通过转换

可以用 ksql 吗?

更新 :

我们可以做一个级别:

我们可以添加条件吗?

例如:如果有效载荷中存在“b”键生成

否则 :

0 投票
2 回答
915 浏览

apache-kafka - How to read the nested avro fields for creating streams?

I have following AVRO message in Kafka topic.

}

When I run the following query. It creates the stream with null values.

But when I change the AVRO message to following it works.

}

Now If I run the above query the data will be populated.

My question is If I need to populate stream from nested field how can I handle this?

I am not able to find the solution in KSQL documentation page.

Thanks in advance. I appreciate the help. :)

0 投票
1 回答
50 浏览

scala - 如果不自己用脚本内容填充 ksql.schema.file.content,KsqlRestClient.makeKsqlRequest("RUN SCRIPT ...") 将无法工作

在我的 Scala 应用程序中,我试图告诉我的 KSQL Server 它应该使用该函数执行RUN SCRIPT <script>命令。KsqlRestClient.makeKsqlRequest(String ksql)每次我尝试都没有发生,即使响应成功,所以我开始调试,我看到响应成功但它也返回了某种错误,属性ksql.schema.file.content为空.

由于我没有找到任何有关如何填充此属性的文档,因此我尝试了一些方法并最终发现,如果您使用 .sql 文件的内容填充此属性,则运行脚本命令有效。

有没有人知道 ksql.schema.file.content 通常应该填充什么内容和/或我使用它的方式是你应该如何从 Scala 应用程序中执行 KSQL 脚本。我已经将 makeKsqlRequest 中的文本复制并粘贴到 KSQL CLI 中,一切正常,所以应该没有错误。