1

这是我正在尝试的

Collection <TopicPartition> partitions = consumer.partitionsFor(topic).stream();

以及如何表明您已经结束或不再有消息可供使用。如果偏移量与当时经纪人的结束偏移量不匹配怎么办。

有什么建议么。

4

1 回答 1

0

为了获得最新的偏移量,您可以使用命令行:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list localhost:9092 \
    --topic topicName

或以 Java 编程方式:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    consumer.subscribe(Arrays.asList("topicName"));
        Set<TopicPartition> assignment;
        while ((assignment = consumer.assignment()).isEmpty()) {
            consumer.poll(Duration.ofMillis(500));
        }
        consumer.endOffsets(assignment).forEach((partition, offset) -> System.out.println(partition + ": " + offset));
}

现在,如果您想强制消费者从最新的偏移量开始消费,您可以使用以下属性:

props.put("auto.offset.reset", "latest"); 
// or props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

或强制它从最新的偏移量使用

consumer.seekToEnd();

于 2020-04-21T17:48:40.373 回答