问题标签 [micronaut-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
178 浏览

kotlin - Kafka Streams 因 Materialized 失败。`as`(STORE_NAME)

我正在创建一个流应用程序来丰富来自存储在表中的主题的数据。在我将物化为 api 添加之前,一切都运行良好。有人知道为什么会这样吗?我已经在网上搜索了解释这一点的资源。

它始终使用新的 kafka 实例运行。

如果我删除它,代码工作正常。

它抛出以下堆栈跟踪并终止流:

编辑:原来状态存储在测试容器测试的运行之间。所以使用KafkaStreams#cleanup方法来解决这个流状态冲突的问题。

0 投票
1 回答
166 浏览

java - Kafka 流消息被多次处理

我目前正在开发一组微服务,这些微服务通过使用 Kafka 进行通信,更具体地说是流。

在大多数情况下和在开发环境中,一切似乎都运行良好,没有任何问题,但是,在暂存环境中,我遇到了我无法理解为什么会发生的行为。

在某些情况下,应用程序会多次处理通过流使用者接收到的单个(或多个)消息。

似乎每当消息的实际处理(即应用程序逻辑)需要一些时间完成时就会发生这种情况——考虑到它涉及 I/O 和其他繁重的操作,它可能会这样做。

我对 Kafka 相当陌生,但据我了解,这与我的消费者没有足够快地提交偏移量有关,这反过来又不会将其标记为正在处理。我试图找出可能会缓解此问题的配置设置,但坦率地说,我不明白需要做什么。

作为参考,这是我的 Stream 的配置:

该应用程序分别使用 Micronaut 和 Java,最新版本和版本 11 开发。

如果有人可以提出解决此问题的方法,那将非常有帮助。

0 投票
0 回答
43 浏览

micronaut - Micronaut @KafkaListener 出错时丢弃轮询循环中的剩余消息

使用@KafkaListener注解消费消息时,如果抛出异常,则轮询循环中的剩余消息将被忽略。

这意味着要实现“至少一次”消息处理语义,需要在监听器方法中捕获所有异常。

KafkaConsumerProcessor 的这一行是轮询循环出错的地方。(这个类在过去的一个月里被重构了,但问题仍然存在)

也许 Micronaut Kafka 团队中的某个人可以告诉我这是不是故意的?我在提出 GitHub 问题和这里的问题之间纠结!我认为大多数使用侦听器的人都希望轮询循环中的下一条消息在错误时被处理。

0 投票
0 回答
106 浏览

java - 为什么有时Kafka Messaging Queue在编写集成测试时不返回任何消息?

我正在尝试在我的应用程序中为 Kafka Messaging 编写集成测试。当我运行我的测试时,有时它们会通过,有时它们会失败。当它们失败时,测试类中的以下行无法返回任何消息。我不确定为什么它有时会返回消息,有时却不会。非常感谢任何解决问题的帮助。所有配置都在 yml 文件中。

制片人

消费者

KafkaBaseTest

测试班

0 投票
2 回答
191 浏览

java - 为测试类执行 Java 提供顺序

我有三个测试类来测试我的控制器、服务和 Kafka 消息传递。当我独立运行 Kafka 消息传递时,它可以工作。但是当我一起运行我的所有测试文件时,Kafka 测试类只要它是第一个被执行的就通过,否则它会失败。

我想通过确保 Kafka 测试类始终是第一个执行来命令测试类的执行。

我怎样才能做到这一点?带测试套件??还有其他方法吗?我的应用程序是使用 Micronaut、Java 8 构建的。

0 投票
2 回答
301 浏览

apache-kafka - Micronaut Kafka:运行状况检查失败并显示“集群授权失败”

我正在尝试使用来自我组织外部的 Kafka 集群的消息,这需要身份验证。

我正在接收消息,所以可能部分正确,但我在日志中收到以下错误消息:

08:54:50.840 [kafka-admin-client-thread | adminclient-1] 错误 immhealth.indicator.HealthResult - 健康指标 [kafka] 报告异常:org.apache.kafka.common.errors.ClusterAuthorizationException:集群授权失败。

DOWN以及健康检查中的结果状态。

这是来自的kafka部分application.yaml

0 投票
0 回答
33 浏览

apache-kafka - 使用 Micronaut 侦听器重试失败的 kafka 消息

我过去使用过与 spring 应用程序的 kafka 集成,并且已经实现了重试机制。但是我们的团队决定启动一个 micronaut 服务,因为我们发现了一些不错的好处。

我添加了 micronaut-kafka 依赖项并设置了一个监听器。问题是,如果一条消息导致我的侦听器抛出异常,则会记录异常但提交偏移量。

有没有办法为每条消息配置重试次数?