1

我们使用 Spring Cloud Stream Kafka Binder(与 Project Reactor 集成,即 Flux)和手动偏移提交 (即autoCommitOffset = false)。

我们正在尝试使用来自 spring-kafka-test 的 Embedded Kafka编写一个集成测试,该测试应该通过在测试向我们的主题发送消息之前和之后使用管理客户端 手动读取消费者组偏移量来断言这一切正常。

测试间歇性地失败。使用等待性,我们现在最多等待 10 秒来轮询偏移量,这似乎解决了我们的大部分问题,因为偏移量将在大约 7 秒后改变——但这对于测试来说并不令人满意。

一旦我们通过调用手动确认消息接收,有没有办法确保 Spring Cloud Stream Kafka Binder 立即写入偏移更改Acknowledgement.acknowledge()

换句话说:我们如何验证acknowledge在我们的测试中被调用而无需等待?

我们使用 Kotlin、Mockito 和Mockito-kotlin,因此不能使用 PowerMockito。

4

1 回答 1

2

问题是Consumer不是线程安全的。提交必须在容器线程上完成。如果消费者在你确认时坐在poll()里面,你必须等待直到pollTimeout偏移量才会被提交。

默认pollTimeout值为 5 秒。

您可以添加一个ListenerContainerCustomizer @Bean来修改ContainerProperties.

于 2019-07-01T17:41:23.553 回答