问题标签 [spring-kafka-test]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
455 浏览

apache-kafka - 使用 Java 查找 Kafka 主题中的消息数

我正在使用Kafka Utils来测试某个基于 kafka 的消息传递系统。我想在不使用kafka-console-consumer.sh脚本的情况下找出特定主题中的消息数。我似乎找不到基于 KafkaTestUtils 的方式或 java 中的任何方式来帮助我实现这一目标。其他类似问题的答案都没有帮助我。

0 投票
0 回答
176 浏览

spring-boot - 在 gitlab 中部署代码时执行未知的测试类方法

我正在尝试在 gitlab 中部署我的代码。

它有一个单元测试类来测试我的 Kafka 生产者。

它在本地正常运行。但是当我尝试在 gitlab 中构建它时,它会因为测试而失败。

但是我在测试的任何地方都没有一个名为 classMethod 的测试。

为什么会发生这种情况以及如何解决?

这是我的测试。

我删除了断言部分以减小大小

0 投票
1 回答
494 浏览

spring-boot - 使用@EmbeddedKafka 时做@DirtiesConfig 的正确方法是什么

我们的项目中有一个“小”问题:“无法建立到节点 0 的连接。代理可能不可用。” 测试运行时间非常长,并且每秒至少记录一次此消息。但我发现,如何摆脱它。继续阅读。如果配置/注释中有不正确的地方,请告诉我。

首先是版本:

这会自动带来

现在我们将考虑由以下注释的集成测试:

然后我们在其中进行了一些测试,例如:

这个测试只是创建一些请求数据,调用 post,然后将数据持久化到 DB 中。然后我们将使用 GET 来查找,是否可以找到这些数据。琐碎的。对我来说有点奇怪的是这里的事务处理。测试类使用@Transactional 注释,但根据日志事务仅在Controller 方法上打开,在此示例中(它应该在服务中,当然)相同的@Transactional 注释。两者都带有 TxType.REQUIRED 传播。这将导致测试发起的回滚实际上没有回滚,因为事务已经提交。如果你知道,为什么会这样,请指教。但这不是这个问题的症结所在。到目前为止,我们只是将@DirtiesContext 放在这个方法上,它应该只是重新初始化上下文。它解决了未回滚数据的问题,上下文重新初始化的高成本。但以下消息开始出现在日志中:

删除它@DirtiesContext并将其放置在类级别上

具有相同的行为(除了荒谬的额外开销)。但是,如果我删除所有@DirtiesContext,并手动清除数据库并提交更改,以便在每次测试后还原更改,一切正常,没有警告或错误。

所以我认为有两件事。我的问题是由不正确的 tx 处理引起的(请帮助),但 @DirtiesContext 应该也可以与 spring-kafka 一起使用,这似乎不起作用。原则上不可能(或者是?),或者我的某些配置不正确(请帮助),或者它可能是一个错误?

0 投票
1 回答
1417 浏览

spring - @EmbeddedKafka 尝试多次注册 AppInfo mbean?

我使用@EmbeddedKafka 进行了多个集成测试,在迁移到更新的 springboot 版本 2.1.8.RELEASE 后,日志中充满了这些堆栈跟踪。知道是什么原因造成的吗?

0 投票
1 回答
448 浏览

spring-boot - 即使测试通过,KafkaTest 期间也会出现异常

我正在使用 EmbeddedKafka 运行 Junit 测试,我能够生产和使用 EmbeddedKafka 代理,并成功断言我发送的数据。

但是我可以在堆栈跟踪中看到很多异常,这些异常发生在断言完成之后。

1)

java.io.IOException:在 org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) ~[kafka-clients-2.0.1.jar:na] 处读取响应之前断开与 0 的连接kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:240) ~[kafka_2.11-2.0.1.jar:na] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [kafka_2.11-2.0. 1.jar:na]

2)

2019-10-04 15:49:27.123 WARN 1812 --- [r-0-send-thread] kafka.controller.RequestSendThread:[RequestSendThread controllerId=0] 控制器 0 与代理 localhost:54745 的连接(id:0 机架: null) 不成功

java.net.SocketTimeoutException:在 kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:280) [kafka_2.11-2.0.1.jar:na] 在 1000 毫秒内无法连接 kafka.controller.RequestSendThread.doWork(ControllerChannelManager .scala:233) [kafka_2.11-2.0.1.jar:na] 在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [kafka_2.11-2.0.1.jar:na]

3)

java.io.IOException: 连接到 localhost:54745 (id: 0 rack: null) 失败。在 org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70) ~[kafka-clients-2.0.1.jar:na] 在 kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:27​​9) [kafka_2 .11-2.0.1.jar:na] 在 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233) [kafka_2.11-2.0.1.jar:na] 在 kafka.utils.ShutdownableThread.run(ShutdownableThread. scala:82) [kafka_2.11-2.0.1.jar:na]

4)

java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(未知来源)~[na:1.8.0_181] at java.util.concurrent.CountDownLatch.await(未知来源)~[na:1.8 .0_181] 在 kafka.utils.ShutdownableThread.pause(ShutdownableThread.scala:69) [kafka_2.11-2.0.1.jar:na] 在 kafka.controller.RequestSendThread.backoff$1(ControllerChannelManager.scala:221) ~[kafka_2 .11-2.0.1.jar:na] 在 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:235) ~[kafka_2.11-2.0.1.jar:na] 在 kafka.utils.ShutdownableThread.run(ShutdownableThread .scala:82) [kafka_2.11-2.0.1.jar:na]

我的测试:

为什么会有这些例外?它们是在什么过程中发生的?请帮忙

0 投票
1 回答
2640 浏览

apache-kafka-streams - 使用 Kafka 2.2.x 的 Spring 嵌入式 Kafka

升级 kafka-client 和 kafka-stream 2.2.3 所需的 Spring-Kafka 2.2.x 时,嵌入式 Kafka Broker for Spring 不起作用。它似乎需要在log.dir目录中有一个 meta.properties 文件,并且需要设置一个 broker.id。但是,它会为绑定地址引发异常。有没有人让这个工作?

经纪人属性

样品测试

0 投票
1 回答
1337 浏览

apache-kafka-streams - 如何使用 Kafka Streams 对 Spring Cloud Stream 进行单元测试

我一直在尝试让 Spring Cloud Stream 与 Kafka Streams 一起工作一段时间,我的项目使用嵌入式 kafka 来测试 Kafka DSL,我使用这个存储库作为我的测试实现的基础(它本身就是一个测试用例这个问题)。

我在这里创建了一个存储库来演示这一点。

基本上,当使用使用“Processor.class”和 MessageChannel 作为实现的“DemoApplicationTest.ExampleAppWorking.class”时,测试是成功的。

但是当使用使用自定义绑定和 KStream 作为实现的“DemoApplicationTest.ExampleAppNotWorking.class”时,它会失败。

第二个用例中的错误是这样的:

我错过了什么?

0 投票
3 回答
12293 浏览

apache-kafka - 连接 Kafka 3.0 中面临的问题 - org.apache.kafka.common.KafkaException:无法加载 SSL 密钥库

我正在尝试使用 SSL 连接到 Kafka 3.0,但面临加载 SSL 密钥库的问题

我尝试了许多可能的值,但没有帮助

我已经尝试更改位置,更改位置的值,但仍然没有帮助

面临的错误:

任何人都可以帮忙,密钥库可能有什么问题?正如它所说“无法加载 JKS 类型的 SSL 密钥库 /kafka.keystore.jks”

0 投票
2 回答
3373 浏览

spring-boot - EmbeddedKafka 不工作导致 Scala 错误

我有一个工作的 spring-boot 基于 java-gradle 的服务,可以生成和使用 Kafka 消息。但是我无法使用带有 @EmbeddedKafka 注释的 spring-kafka-test 库或使用 @ClassRule 方式创建集成测试。在这两种方式中,我最终都会遇到相同的错误(关于 Scala 如下所示)。如果有人对幕后可能发生的事情有任何线索,那将非常有帮助。

Spring Boot 版本:2.1.6.RELEASE Spring Kafka 版本:2.2.7.RELEASE

生产者配置:

生产者代码:

属性文件:

我在开始测试时尝试了两种方法和相同的错误:

测试类规则:

使用 EmbeddedKafka 进行测试:

两种情况下的错误:

0 投票
3 回答
2562 浏览

mockito - 等到@KafkaListener 用@EmbeddedKafka 完成测试中的消息处理

我有一个@KafkaListener消费者,想编写集成测试。事实是,在处理消息并且数据库中的某些状态发生更改后,似乎很难找到方法Consumer#consume完成执行以执行某些断言的确切时刻。

测试

事实是,如果我将Thread.sleep(1000L)消费者进程消息和所有工作正常但使用 Mockito 它不起作用,因为所有断言在消费者完成其执行之前执行 method Consumer#consume

@KafkaListener是否有机会(使用侦听器等)捕捉消费者确认/完成消息处理以执行具有适当数据库状态的断言的时刻?需要进行集成测试以确保端到端功能正常工作。

我也尝试#verify检查@SpyBean private Service service, method Service#process,但它也不起作用。