每次我运行消费者时,谁能告诉我如何从一开始就使用 Kafka Consumer API 读取消息。
11 回答
这适用于 0.9.x 消费者。基本上,当您创建消费者时,您需要使用 property 为该消费者分配一个消费者组 id ConsumerConfig.GROUP_ID_CONFIG
。每次启动消费者执行类似操作时随机生成消费者组 ID properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
(属性是您将传递给构造函数的 java.util.Properties 的一个实例new KafkaConsumer(properties)
)。
随机生成客户端意味着新的消费者组在 kafka 中没有与之关联的任何偏移量。所以我们在这之后要做的就是为这个场景制定一个策略。正如该auto.offset.reset
物业的文件所述:
如果 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:
- 最早:自动将偏移量重置为最早的偏移量
- latest:自动将偏移量重置为最新的偏移量
- none:如果没有找到先前的偏移量或消费者的组,则向消费者抛出异常
- 其他任何事情:向消费者抛出异常。
因此,从上面列出的选项中,我们需要选择earliest
策略,以便新的消费者组每次都从头开始。
您在 java 中的代码将如下所示:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);
您现在唯一需要弄清楚的是,当有多个消费者属于同一个消费者组但分布时,如何生成随机 id 并将其分布在这些实例之间,以便它们都属于同一个消费者组。
希望能帮助到你!
这样做的一种选择是每次启动时都有一个唯一的组 ID,这意味着 Kafka 会从一开始就向您发送主题中的消息。当你设置你的属性时做这样的事情KafkaConsumer
:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
另一种选择是使用consumer.seekToBeginning(consumer.assignment())
,但这将不起作用,除非 Kafka 首先通过让消费者调用 poll 方法从消费者那里获得心跳。所以 call poll()
,然后如果你想要从头开始的所有记录seekToBeginning()
,然后再调用一次。poll()
这有点骇人听闻,但这似乎是自 0.9 版本以来最可靠的方法。
// At this point, there is no heartbeat from consumer so seekToBeinning() wont work
// So call poll()
consumer.poll(0);
// Now there is heartbeat and consumer is "alive"
consumer.seekToBeginning(consumer.assignment());
// Now consume
ConsumerRecords<String, String> records = consumer.poll(0);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
如果您只是避免保存任何偏移量,消费者将始终在开始时重置。
一种可能的解决方案是在订阅一个或多个主题时使用ConsumerRebalanceListener的实现。ConsumerRebalanceListener 包含在消费者分配或删除新分区时的回调方法。以下代码示例说明了这一点:
public class SkillsConsumer {
private String topic;
private KafkaConsumer<String, String> consumer;
private static final int POLL_TIMEOUT = 5000;
public SkillsConsumer(String topic) {
this.topic = topic;
Properties properties = ConsumerUtil.getConsumerProperties();
properties.put("group.id", "consumer-skills");
this.consumer = new KafkaConsumer<>(properties);
this.consumer.subscribe(Collections.singletonList(this.topic),
new PartitionOffsetAssignerListener(this.consumer));
}
}
public class PartitionOffsetAssignerListener implements ConsumerRebalanceListener {
private KafkaConsumer consumer;
public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) {
this.consumer = kafkaConsumer;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//reading all partitions from the beginning
for(TopicPartition partition : partitions)
consumer.seekToBeginning(partition);
}
}
现在,只要将分区分配给消费者,就会从头开始读取每个分区。
1) https://stackoverflow.com/a/17084401/3821653
要重置消费者组,可以删除 Zookeeper 组 id
import kafka.utils.ZkUtils;
ZkUtils.maybeDeletePath(<zkhost:zkport>, </consumers/group.id>);`
这是我从头开始读取消息的代码(使用 Java 11)
try (var consumer = new KafkaConsumer<String, String>(config)) {
consumer.subscribe(Set.of(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
});
// polling messages
}
您可以在此处查看完整的代码示例:
https://gist.github.com/vndung/4c9527b3aeafec5d3245c7a3b921f8b1
所以对我来说,有效的是上面建议的组合。关键的变化是包括
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
并且每次都有一个随机生成的 GROUP ID。但仅此一项对我不起作用。出于某种原因,我第一次对消费者进行调查时,它从未得到任何记录。我不得不破解它才能让它工作 -
consumer.poll(0); // without this the below statement never got any records
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
我是 KAFKA 的新手,不知道为什么会发生这种情况,但是对于仍在尝试使其正常工作的其他人,希望这会有所帮助。
props.put("auto.offset.reset", "smallest");
在创建时使用高级消费者集ConsumerConfig
If you are using the java consumer api more specifically org.apache.kafka.clients.consumer.Consumer, You can try the seek* methods.
consumer.seekToBeginning(consumer.assignment())
Here, consumer.assignment() returns all the partitions assigned to a given consumer and seekToBeginning will start from the earliest offset for the given collection of partitions.
始终从偏移量 0 读取,而无需每次都创建新的 groupId。
// ... Assuming the props have been set properly.
// ... enable.auto.commit and auto.offset.reset as default
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(0); // without this, the assignment will be empty.
consumer.assignment().forEach(t -> {
System.out.printf("Set %s to offset 0%n", t.toString());
consumer.seek(t, 0);
});
while (true) {
// ... consumer polls messages as usual.
}
另一种选择是让您的消费者代码保持简单,并使用kafka-consumer-groups
Kafka 附带的命令行工具从外部引导偏移管理。
每次,在启动消费者之前,您都会调用
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--execute --reset-offsets \
--group myConsumerGroup \
--topic myTopic \
--to-earliest
根据您的要求,您可以使用该工具重置主题的每个分区的偏移量。帮助功能或文档解释了这些选项:
--reset-offsets also has following scenarios to choose from (atleast one scenario must be selected):
--to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest : Reset offsets to earliest offset.
--to-latest : Reset offsets to latest offset.
--shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
--from-file : Reset offsets to values defined in CSV file.
--to-current : Resets offsets to current offset.
--by-duration <String: duration> : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
--to-offset : Reset offsets to a specific offset.