问题标签 [spring-kafka-test]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
70 浏览

java - 嵌入式 Spring Kafka 尝试启动运行时 Kafka

我有一个 Spring Kafka 项目,我已经为它编写了一些单元测试。当我执行测试类时,项目尝试使用实际的 Kafka 代理,而不是我的测试类中注释的嵌入式 Kafka 代理。

我尝试过的检查到 gitlab --> https://gitlab.com/mohammad.mnajar/spring-kafka-unit-test

任何帮助将不胜感激。

0 投票
1 回答
850 浏览

apache-kafka - 在 Spring Cloud Stream 中使用嵌入式 Kafka 进行集成测试时,如何立即验证消息是否已得到确认?

我们使用 Spring Cloud Stream Kafka Binder(与 Project Reactor 集成,即 Flux)和手动偏移提交 (即autoCommitOffset = false)。

我们正在尝试使用来自 spring-kafka-test 的 Embedded Kafka编写一个集成测试,该测试应该通过在测试向我们的主题发送消息之前和之后使用管理客户端 手动读取消费者组偏移量来断言这一切正常。

测试间歇性地失败。使用等待性,我们现在最多等待 10 秒来轮询偏移量,这似乎解决了我们的大部分问题,因为偏移量将在大约 7 秒后改变——但这对于测试来说并不令人满意。

一旦我们通过调用手动确认消息接收,有没有办法确保 Spring Cloud Stream Kafka Binder 立即写入偏移更改Acknowledgement.acknowledge()

换句话说:我们如何验证acknowledge在我们的测试中被调用而无需等待?

我们使用 Kotlin、Mockito 和Mockito-kotlin,因此不能使用 PowerMockito。

0 投票
1 回答
2615 浏览

java - 单元测试 MessageListener 类

如何对实现 spring-kafka MessageListener 接口的类进行单元测试?我有一个监听器类,我正在使用 onMessage 函数手动监听主题。这个函数很简单,只是接收消息。

我的设置是使用 Spring 5.8、Spring-Kafka 2.2.7、Spring-Kafka-Test、JUnit 和没有spring boot。

我一直在尝试来自 Spring 参考文档和其他帖子的一系列不同示例,但似乎没有一个显示一种简单的方法来测试实现 MessageListener 的 Listener 类。

我不确定是否需要设置 EmbeddedKafkaBroker 或 EmbeddedKafkaRule,或者是否有不同的测试方法。当我尝试使用 EmbeddedKafkaRule 时,我收到一条错误消息,显示 NoClassDefFound。

但是我不明白这个测试用例如何影响我的 onMessage 函数。

我想单元测试的课程

这会引发一个奇怪的错误,上面写着这个...... NoClassDefFound

0 投票
1 回答
264 浏览

java - 单元测试 KafkaMessageListenerContainer

KafkaMessageListenerContainer是否可以对@Bean进行单元测试?我想知道是否可以对用于构建容器的代码进行放置单元测试。我正在尝试确保容器已成功构建。

但是,当我使用一组假的消费者属性直接对执行此操作的函数进行单元测试时,我收到一条错误消息

Failed to Construct Kafka Consumer.... No resolvable bootstrap urls given in bootstrap.servers

这个错误让我相信这可能无法以这种方式进行测试。有没有另一种方法来测试这个,或者有没有办法实际构建 kafka 消息侦听器容器并通过单元测试对其进行验证?

0 投票
1 回答
2485 浏览

java - 嵌入式 Kafka 从错误的分区数开始

我在 JUnit 测试中启动了 EmbeddedKafka 的实例。我可以在应用程序中正确读取已推送到流中的记录,但我注意到的一件事是每个主题只有一个分区。谁能解释为什么?

在我的应用程序中,我有以下内容:

这将返回一个包含一项的列表。当针对具有 3 个分区的本地 Kafka 运行时,它会按预期返回一个包含 3 个项目的列表。

我的测试看起来像:

0 投票
1 回答
2716 浏览

apache-kafka - 带有嵌入式 Kafka 的 Spring Kafka 测试在删除日志时失败

我正在使用带有嵌入式 kafka 的 spring kafka 进行 JUnit 测试,它在 windows 上的每个测试都会出错:

我只是做了如下的基本配置

任何解决建议或任何解决方法表示赞赏。

0 投票
1 回答
1064 浏览

apache-kafka - Spring Kafka (2.2.7.RELEASE) 与。kafka-clients:2.2.1 嵌入式代理启动期间的 IOException

在依赖检查警告的驱动下,我们尝试使用 spring-kafka:2.2.7 在我们的设置中将 org.apache.kafka:kafka-clients 的版本提升到 2.2.1 版本。

因此,使用 EmbeddedKafkaRule 的测试在代理启动期间失败,并出现 IOException 声称“无法加载 /some/path..”

我们确保类路径上没有冲突的 kafka-clients 版本,并且还尝试将 EmbeddedKafkaRule 的 logs.dir 指定到 maven 目标文件夹下的某个文件夹,如上例中的“target/embedded-kafka”。两者都没有成功。

有人遇到同样的问题并解决了吗?

0 投票
2 回答
3444 浏览

spring-boot - 如何使用 Spring Kafka 测试 Kafka Streams 应用程序?

我正在使用 Kafka Streams、Spring-Kafka 和 Spring Boot 编写流应用程序。我找不到任何信息如何在使用 Spring-Kafka 时正确测试 Kafka Streams DSL 完成的流处理。文档提到 EmbeddedKafkaBroker,但似乎没有关于如何处理例如状态存储的测试的信息。

只是为了提供一些我想测试的简单示例。我注册了以下 bean(其中 Item 是 avro 生成的):

测试所有项目编号是否汇总的正确方法是什么?

0 投票
1 回答
777 浏览

spring-kafka - 不能在测试中多次使用 @KafkaListener

我们正在尝试测试一个 cloud-stream-kafka 应用程序,在测试中我们有多个发送消息的测试方法,以及一个等待响应的 @KafkaListener。

但是,第一次测试往往会通过,而第二次测试往往会失败。

任何指针将不胜感激。

似乎@KafkaListener正在为每个测试注册一个实例,因为我们注意到使用该id值会导致java.lang.IllegalStateException: Another endpoint is already registered with id 'kafka-listener-consumer'

@RabbitListener当使用的消息传递框架是 RabbitMQ 时,我使用过类似的测试。我希望我可以做类似的事情,因为一些测试用例涉及等待没有消息被发布,我们可以用assertFalse(latch.await(10, TimeUnit.SECONDS))

0 投票
0 回答
1732 浏览

java - Spring Boot Kafka - 使用 yaml/properties 中的类型信息的消费者自定义 JsonDeserializer,没有 @Bean

是否可以在不使用of和定义@Configuration类的情况下将类型信息传递给 Kafka 消费者的反序列化器?@BeanconsumerFactory()containerFactory

在该@Bean方法中,我必须将 yml 文件中已有的所有配置再次放入地图并将其传递给工厂的构造函数,但我认为这是一种开销。我想找到一种方法将所有配置保存在 yaml/properties 中。(而且我认为将配置放入 yaml 而不是代码中更简洁)

我想为每个带有注释的消费者/每个方法指定类型,@KafkaListener因为我将收到不同的 JSON,映射到其相应的 DTO,所以通用spring.json.value.default.type在这里不适用。(而且我认为它很难看)

我正在使用我的代码进行测试(Spring Kafka Test with an EmbeddedKafka),但现在它抱怨找不到类型信息:

我知道文档,但我不知道如何正确执行:

https://docs.spring.io/spring-kafka/reference/html/#spring-messaging-message-conversion