我正在为连接到 Kafka 以使用和发布数据的应用程序编写集成测试,为此我正在使用 EmbeddedKafka。部分逻辑是使用特定偏移量的消息。我想模拟这个,因此我的目标是:
- 向 EmbeddedKafka 发送一些消息,但具有特定的偏移量
- 以相同的偏移量使用它们
这现在不起作用,即我正在发送带有 的消息KafkaHeaders.OFFSET
,但它被忽略了,我之后使用的消息具有不同的偏移量。事实上,偏移量只是从 0 开始,然后递增。
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.PARTITION_ID, partition);
.setHeader(KafkaHeaders.OFFSET, val);
kafkaTemplate.send(messageBuilder.build());
KafkaTemplate<String, String>
以标准方式初始化。另一方面,我以标准方式消费:
private class MessagesWithOffsetsConsumer implements BatchMessageListener<String, String>, ConsumerSeekAware {
MessagesWithOffsetsConsumer() {
}
@Override
public void onMessage(List<ConsumerRecord<String, String>> records) {
records.forEach(record -> {
String id = record.key();
String dataAssetPayload = record.value();
int partitionId = record.partition();
LOGGER.info("Received record: {} offset: {}", id, record.offset());
}
}
简而言之,接收到的消息的偏移量与onMessage
消息构建期间设置的偏移量不同。
有什么办法可以做到这一点?