2

我正在使用 spring-kafka 和 spring-kafka-test 版本 1.0.2.RELEASE。

在我的一个测试中,我的应用程序使用 KafkaTemplate 和大多数默认配置设置连续发送 100 条记录到 EmbeddedKafka 实例上的单个 TopicPartion。

我使用 KafkaTestUtils.getRecords(consumer) 方法尝试从 Kafka 实例中获取记录并验证它们是否已全部发送。

第一次调用 getRecords 时,我只收到一条记录。如果我再次调用它,我会得到另一个 99。

如果我明确地将消费者的位置设置为 TopicPartition 的开头,然后调用 getRecords,我会得到全部 100。

为什么 getRecords 第一次只能得到一条记录?有没有更好的方法来一次获得所有 100 个,然后通过显式调用消费者的 seekToBeginning ?

4

2 回答 2

2

这听起来像是一个时间问题。很有可能您第一次调用时只有一条消息可用poll()- 该方法不能保证将获取多少条消息。当您编写代码时,您不应该假设您会一次收到 X 条记录。Kafka 0.10 中有一个消费者属性max.poll.records,出于测试目的,您可能希望将其设置为 1,然后执行接收循环,直到轮询所有 100 个。

于 2016-08-13T11:30:59.513 回答
0

很可能只是一个竞争条件——消费者坐在里面,poll()并且代理一到达就发送第一条消息。

请参阅属性fetch.min.byteskafka 文档fetch.max.wait.ms

fetch.min.bytes默认为 1。

编辑

您也可以尝试调用flush()before 。KafkaTemplategetRecords()

但是,您的测试不应该真正依赖于一次获取所有消息 - 太脆弱了。

于 2016-08-11T17:41:27.050 回答