问题标签 [spring-kafka-test]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2484 浏览

java - 如何为 Spring Kafka Listener 创建集成测试

我有一个微服务发送到另一个应该消费的微服务消息。

所以,kafka 配置工作,一切工作,但我需要为这段代码创建一个集成测试,我不知道如何。

我的 KafkaConsumer.Class 带有组件注释:

我的集成测试是

抽象类:

0 投票
1 回答
7339 浏览

spring-boot - 如何使用 EmbeddedKafkaRule/EmbeddedKafka 设置 Spring Kafka 测试以修复 TopicExistsException 间歇性错误?

我在测试我的 Kafka 消费者和生产者时遇到了问题。集成测试间歇性地失败,出现TopicExistsException.

这就是我当前的测试类UserEventListenerTest的样子——对于其中一位消费者来说:

该类UserEventListener使用来自 的消息user-event-topic-UserEventListenerTest并将消息发布到some-topic-after-UserEvent

正如您从设置中看到的那样,我有一个测试生产者将向其发布消息user-event-topic-UserEventListenerTest以便我可以测试是否UserEventListener使用该消息和一个测试消费者将从该消息使用该消息some-topic-after-UserEvent以便我可以查看是否将消息发布UserEventListenersome-topic-after-UserEvent处理记录。

KafkaConfigProperties课程如下。

application.yml看起来像这样。

错误日志

我试过的:

  • 通过指定引导配置,在每个测试中使用不同的引导服务器地址,例如@SpringBootTest(properties = ["application.kafka.bootstrap=localhost:2345"])
  • 通过覆盖主题配置,在每个测试中使用不同的主题名称,@SpringBootTest就像上一个要点中的引导服务器覆盖一样
  • 添加@DirtiesContext到每个测试类

软件包版本

  • 科特林 1.3.61
  • Spring Boot - 2.2.3.RELEASE
  • io.projectreactor.kafka:reactor-kafka:1.2.2.RELEASE
  • org.springframework.kafka:spring-kafka-test:2.3.4.RELEASE (仅测试实现)

问题

EmbeddedKafkaRule我有多个使用并设置或多或少相同的测试类。对于它们中的每一个,我指定了不同的 kafka 引导服务器地址和主题名称,但我仍然间歇性地看到 TopicAlreadyExists 异常。

我可以做些什么来使我的测试类保持一致?

0 投票
1 回答
1990 浏览

unit-testing - Kafka Streams 测试:java.util.NoSuchElementException:未初始化的主题:“output_topic_name”

我已经按照https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为 kafka 流应用程序编写了一个测试类 ,其代码是

我使用EventSerde类来序列化和反序列化值。

当我运行此代码时,它会java.util.NoSuchElementException: Uninitialized topic: processed_events通过以下堆栈跟踪给出错误:

如您所见,我已经初始化了输入和输出主题。我还调试了代码,当我从输出主题读取值时发生错误

我不明白我还应该做什么来初始化 outputTopic。谁能帮我解决这个问题?

我正在使用 apache kafka-streams-test-utils 2.4.0 和 kafka-streams 2.4.0

0 投票
1 回答
3786 浏览

spring - 带有junit5的spring嵌入式kafka-引导服务器中没有给出可解析的引导URL

我正在尝试使用 Embedded kafka 进行测试。我正在使用 spring boot 和 junit5,如下所示

但是,我的测试失败了No resolvable bootstrap urls given in bootstrap servers

我也在 yml 文件中使用测试配置文件

请帮忙。

0 投票
0 回答
132 浏览

spring - spring kafka 嵌入式代理 - 我的实际听众永远不会被触发

我正在使用带有 spring boot 和 junit 5 的 Kafka 嵌入式代理。我已经能够成功连接并看到嵌入式代理正在运行。

在我的设置方法中,我将一些消息注入到我的实际代码监听的队列中

尽管设置方法中没有遇到错误,但我的消费者/听众永远不会被触发

我的消费者设置如下

在其他地方我设置了我的 ListenerContainerFactory ,名称如下

这可能有什么问题?我在测试用例中的断言失败,此外,如果我的消费方法被调用,我看不到应该打印的日志语句。

我有一种感觉,自动配置是由于@SpringBootTest并且@EmbeddedKafka正在设置其他一些侦听器容器工厂,所以我的@KafkaListener注释可能是错误的。我知道,它有点模糊,但你能告诉我看什么/在哪里看吗?如果我作为消费者运行,@SpringBootApplication我的消费者会从实际队列中拉入消息。所以我的实际应用程序没有问题。它的测试没有执行按照预期。

请帮忙。

编辑 1:我已经spring.kafka.consumer.auto-offset-reset=earliest在我的 yml 文件中设置了。

0 投票
2 回答
505 浏览

spring-kafka - 使用 Spring Boot 2.1.12 和 Spring for Apache Kafka 2.2.12 进行 JUnit 测试

只是想找出一个简单的例子,使用 Spring Boot 2.1.12 和 Spring for Apache Kafka 2.2.12 与 KafkaListener 一起工作,以重试最后一条失败的消息。如果消息失败,则应将消息重定向到将进行重试的另一个主题。我们将有 4 个主题。topic, retryTopic, sucessTopic 和 errorTopic 如果 topic 失败,应该重定向到 retryTopic,在那里进行 3 次重试尝试。如果这些尝试失败,则必须重定向到 errorTopic。如果主题和重试主题都成功,则应重定向到成功主题。我需要用JUnit Test 覆盖 90% 的案例。

0 投票
2 回答
195 浏览

spring-kafka - Spring Kafka Unit Tests 触发监听,但是方法使用consumer.poll无法获取消息

我们正在使用 spring-kafka-test-2.2.8-RELEASE。当我使用模板发送消息时,它正确触发了监听器,但是在consumer.poll中无法获取消息内容。如果我实例化 KafkaTemplate 而不在类属性中“连接”它并基于生产者工厂实例化它,它会发送消息,但不会触发 @KafkaListener,只有在我在 @Test 方法中设置消息侦听器时才有效。我需要触发 kafka 监听器,并意识到接下来会调用哪个主题(“成功”主题执行时没有错误,“errorTopic”监听器抛出异常)和消息内容。

0 投票
1 回答
373 浏览

unit-testing - Kafka 主题单元测试

我正在使用以下方法在 Kafka 中创建主题,

我的测试课是

我收到以下错误,

@EmbeddedKafka只是想确保主题出现在列表中。请问这是正确的方法还是任何其他建议?

0 投票
0 回答
143 浏览

java - Eclipse 问题:调试时触发单元测试,但运行时未触发

目标:我想“单元”测试 kafka 生产者向 docker 容器中托管的 kafka 发送消息。换句话说,它必须连接到外部卡夫卡,而不是用于单元测试的常用卡夫卡嵌入式卡夫卡。

我可以从命令行成功访问我的 kafka docker 容器:

我在调试单元测试时也取得了成功。我检查过,运行和调试配置都是一样的。

然而,当我运行我的 junit 测试时,我看不到任何尝试连接到同一个 kafka 代理的方法。这是配置。您可能会注意到我尝试在所有我能想象到地址可能被读取的地方添加

“单元测试”(讨论它是单元还是集成超出了这个问题)

源/测试/java

src/test/resources/application.properties

kafka 生产者配置(与运行应用程序相同,即没有测试目的)

源/主/java

生产者(与生产案例相同)

运行 junit 测试时的日志

调试单元测试时的日志:

...

*** 添加

实际上,只有当我在其上指向断点时才会调用单元测试

蚀

0 投票
1 回答
556 浏览

apache-kafka - KStream to KTable Inner Join 每次使用相同数据处理时产生不同数量的记录

我想做一个 KStream 到 KTable Join。使用 KTable 作为查找表。以下步骤显示了执行代码的顺序

  1. 构造 KTable

  2. ReKey KTable

  3. 构造 KStream

  4. ReKey KStream

  5. 加入 KStream - KTable

假设 KStream 中有 8000 条记录,KTable 中有 14 条记录,假设 KStreams 中的每个键在 KTable 中都有一条记录。所以预期的输出将是 8000 条记录。

每次我第一次加入或启动应用程序时。预期输出为 8000 条记录,但有时我只看到 6200 条记录,有时 8000 条完整的记录集(两次),有时没有记录,等等。

  • 问题1:为什么每次运行应用程序时记录都不一致?

    在 KTable 被构造(construct + Rekey)之前,KStreams 被构造并且数据可用于从 KStream 端连接,然后连接从 KTable 开始,因此在 KTable 构造之前不会在最终连接中看到数据。一旦构建了 KTable,我们就可以看到剩余记录的连接发生了。

  • 问题2:如何解决记录中加入不一致的问题?

    我尝试使用嵌入式 Kafka 进行 KStream 和 Ktable 连接的测试用例。有 10 条来自 KStreams 的记录和 3 条来自 KTable 的记录用于进程。当我第一次运行测试用例时,没有加入,加入后我没有看到任何数据。当第二次运行时,它运行完美。如果我清除状态存储然后回到零。

  • 问题3:为什么会发生这种行为?

    我尝试使用 KSQL,连接运行良好,我得到了 8000 条记录,然后我进入 KSQL 源代码,我注意到 KSQL 也在执行相同的连接功能。

  • 问题 4:KSQL 是如何解决这个问题的?

我看到几个示例建议的答案

我正在使用 spring 云流作为依赖项。

我还看到在 JIRA 的某个地方有一个未解决的问题。