问题标签 [spring-kafka-test]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
804 浏览

spring - Spring kafka单元测试侦听器未订阅主题

我有一个用 kafka 探索 spring 的示例项目(在这里找到)。我有一个订阅主题my-test-topic-upstream的听众,它只会丢失消息和密钥并将其发布到另一个主题my-test-topic-downstream。我试过这是本地 kafka (docker-compose文件在那里),它可以工作。

现在我正在尝试使用嵌入式 kafka 服务器为此编写测试。在测试中,我有一个嵌入式服务器正在启动(TestContext.java),它应该在测试之前启动(覆盖 junit beforeAll)。

然后我创建一个生产者(TickProducer)并向我希望我的听众能够消费的主题发布一条消息。

我看到以下日志消息继续记录。

最后失败了

这里有什么提示吗?

0 投票
3 回答
1437 浏览

java - Spring Kafka 测试 - 没有在 @KafkaListener 中使用 EmbeddedKafka 接收数据

我们正在使用 Cucumber 对 out 应用程序进行一些集成测试,并且在测试@KafkaListener. 我们设法使用 EmbeddedKafka 并在其中生成数据。

但是消费者从来没有收到任何数据,我们也不知道发生了什么。

这是我们的代码:

生产者配置

消费者配置

集成测试

步骤定义

消费者 同样的代码在生产 Bootstrap 服务器上运行良好,但它从未在嵌入式 Kafka 中使用

日志中的所有内容看起来都很好,但我们不知道缺少什么。

提前致谢。

0 投票
1 回答
219 浏览

spring-boot - 在 Kafkalistener 集成测试中捕获异常

所以我需要为我的 kafkalistener 方法创建一个集成测试,其中测试期望 ListenerExecutionFailedException 实际上被抛出,因为消息在消费期间由于另一个服务处于非活动状态而失败。

下面是测试代码,我使用 Embeddedkafkabroker 作为生产者和消费者:

我想知道的是异常被认为没有被抛出并且测试失败,即使我知道消息确实被侦听器接收并且异常被抛出,因为我在日志上看到它们。

即使我将预期更改为 Throwable 似乎也没有检测到异常。

我应该怎么做才能让Junit检测到异常?

另外,另一个有趣的事情是我试图模拟在侦听器中调用的服务类并返回一些虚拟值,但是当我使用 Mockito.verify 时没有调用该服务

0 投票
1 回答
4471 浏览

java - 编写customConsumerFactory和customKafkaListenerContainerFactory时未自动加载spring kafka属性

我想从 application.properties 加载我的 spring-kafka 属性,并且必须使用 spring 自动配置加载。我的问题是由以下原因引起的:java.lang.IllegalStateException:没有可用的确认作为参数,侦听器容器必须有一个手动确认模式来填充确认但是我已经在属性文件 spring.kafka.listener.ack-mode= 中设置了它但是,在此属性中手动立即,因为它是我的自定义 fooKafkaListenerContainerFactory 它无法选择此设置。我想要的是不手动设置它应该从我的application.properies中获取。@Gary Russell 感谢您的帮助。

我的代码如下所示

0 投票
0 回答
2285 浏览

apache-kafka - 使用 Kafka 进行模拟单元测试 - 回调模拟生产者

我想围绕调用 Kafka 进行一些测试。我的 Kafka 调用有以下回调设置,见下文:

我想做的是测试这两种情况(onSuccess 和 onFailure)。我尝试关注这个SO question,但我遇到了一个错误,它说“主题不能为空”。这是我尝试过的:

我不太确定这意味着什么,所以几个小时后,我决定尝试不同的路径——使用MockProducer类。它似乎有效,但并非始终如一:

因此,我尝试将其添加到其中,但它抱怨我没有嘲笑正确的事情:

我的问题是,当我发送 Kafka 请求时,是否有人知道我在测试回调的 onFailure 方法时做错了什么?

谢谢!

0 投票
1 回答
129 浏览

spring - AggregatingReplyingKafka模板发布策略问题

当我将 AggregatingReplyingKafkaTemplate 与 template.setReturnPartialOnTimeout(true) 一起使用时,似乎存在一个问题,即使消费者可以获得部分结果,它也会返回超时异常。

在下面的示例中,我有 3 个消费者来回复请求主题,并且我将回复超时设置为 10 秒。我已经明确地将消费者 3 的响应延迟到 11 秒,但是,我希望消费者 1 和 2 的响应能够返回,所以我可以返回部分结果。但是,我收到了 KafkaReplyTimeoutException。感谢您的投入。谢谢。

我根据下面的单元测试遵循代码。[回复KafkaTemplateTests][1]

我在下面提供了实际代码:

0 投票
1 回答
935 浏览

apache-kafka - 使用 @EmbeddedKafka 时的 ZooKeeperClientTimeoutException

将@EmbeddedKafka 添加到集成测试后,我会看到以下错误:

0 投票
0 回答
842 浏览

spring - spring kafka,如何优雅地关闭spring boot应用程序

我在 Spring Boot 应用程序中有 Kafka 消费者。我将 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 保持为 false,并且我的消费者正在手动确认消息。Spring-Kafka:2.2.11.RELEASE

我的配置:

连接工厂

卡夫卡消费者:

如果应用程序在 If 语句处崩溃,则问题会丢失。

我的理解是'acknowledgement.acknowledge();' 没有完成并且应用程序崩溃然后在重新启动时应该再次处理相同的消息。

我需要帮助来了解我在这里做错了什么。

0 投票
1 回答
140 浏览

apache-kafka - 使用 SeekToCurrentErrorHandler 时,多个服务实例中同一消费者组中的同一消费者如何工作

我已经使用SeekToCurrentErrorHandler测试了我的服务有一个实例,该实例在一个消费者组中有一个消费者。在失败的情况下,重试发生并且记录被提交,因为我有用户ACK 模式作为 RECORD 并且没有使用 ACK 模式作为 MANUAL,因为这会产生问题。

现在,让我们再有 2 个我的服务实例,它具有相同的消费者和相同的消费者组。

我的问题:如果记录已由第一个实例提交,这两个新实例是否会像第一个实例一样重试?

配置

STCEH

0 投票
1 回答
1976 浏览

apache-kafka - 嵌入式 Kafka 测试随机失败

我使用 EmbededKafka 实现了一堆集成测试,以测试我们的一个使用 spring-kafka 框架运行的 Kafka 流应用程序。

流应用程序正在从 Kafka 主题中读取消息,将其存储到内部状态存储中,进行一些转换并将其发送到另一个微服务到请求的主题中。当响应返回响应主题时,它会从状态存储中检索原始消息,并根据某些业务逻辑将其转发到我们的下游系统之一,每个系统都有自己的主题。

集成测试只是执行业务条件的各种排列。

最初,测试被分成不同的班级。在运行构建时,一个类中的测试与另一类中的测试发生冲突,并出现一些冲突异常。我没有花太多时间在这上面,只是把所有的测试都移到了同一个班级里。这解决了我从 gradle build 或 intelij EDI 通过的所有测试的问题。

这是测试:

对结果感到满意,我推送了我的更改,只是为了注意到构建在我们的 CI 服务器上失败。再次运行它,这次又失败了,失败与第一次不同。我让一位同事看了一下,他也有与 CI 服务器类似的故障经历。我在我的机器上运行了至少 20 次构建,它总是通过。对我的同事进行一项一项的测试也总是通过。

我们得到的最常见的异常是主题 xyz 已经存在,但偶尔会有一些其他异常表明集群不可能是喜欢的或类似的。所有这些异常都向我们表明,在前一个测试中使用的嵌入式 Kafka 在下一个测试开始之前没有完全关闭,尽管使用了DirtiesContext注释。第一个运行的测试总是通过。

我们俩都花了一整天的时间脱掉我们的头发,这是没有办法让它工作的。我们尝试了谷歌带我们去的任何地方,完全没有运气。最后,我们在测试类中留下了唯一的一个测试场景(交互次数最多的一个)并禁用了其余的。

显然,这不是一个可以接受的永久解决方案,我真的很想了解我们做错了什么以及如何解决它。

预先感谢您的意见。