0

我正在为连接到 Kafka 以使用和发布数据的应用程序编写集成测试,为此我正在使用 EmbeddedKafka。部分逻辑是使用特定偏移量的消息。我想模拟这个,因此我的目标是:

  1. 向 EmbeddedKafka 发送一些消息,但具有特定的偏移量
  2. 以相同的偏移量使用它们

这现在不起作用,即我正在发送带有 的消息KafkaHeaders.OFFSET,但它被忽略了,我之后使用的消息具有不同的偏移量。事实上,偏移量只是从 0 开始,然后递增。

MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(payload)
        .setHeader(KafkaHeaders.TOPIC, topic)
        .setHeader(KafkaHeaders.MESSAGE_KEY, key)
        .setHeader(KafkaHeaders.PARTITION_ID, partition);
        .setHeader(KafkaHeaders.OFFSET, val);
kafkaTemplate.send(messageBuilder.build());

KafkaTemplate<String, String>以标准方式初始化。另一方面,我以标准方式消费:

private class MessagesWithOffsetsConsumer implements BatchMessageListener<String, String>, ConsumerSeekAware {

    MessagesWithOffsetsConsumer() {
    }

    @Override
    public void onMessage(List<ConsumerRecord<String, String>> records) {

        records.forEach(record -> {
            String id = record.key();
            String dataAssetPayload = record.value();
            int partitionId = record.partition();

            LOGGER.info("Received record: {} offset: {}", id, record.offset());
    }
}

简而言之,接收到的消息的偏移量与onMessage消息构建期间设置的偏移量不同。

有什么办法可以做到这一点?

4

1 回答 1

0

不,在ProducerRecord. 你只能设置这些道具:

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

这确实是从KafkaTemplate逻辑上完成的。KafkaHeaders.OFFSET是消费者方面的属性:

public class ConsumerRecord<K, V> {

    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final Headers headers;
    private final K key;
    private final V value;

换句话说:Apache Kafka 不是为明确的偏移设置而设计的。

于 2021-10-18T14:08:04.283 回答