当我在日常工作中偶然发现这个时,我可以在网上找到一些关于相同的描述如下:
在 Kafka 到 0.8.1.1 的版本中,消费者将他们的偏移量提交给 ZooKeeper。当存在大量偏移量(即,消费者计数 * 分区计数)时,ZooKeeper 不能很好地扩展(尤其是对于写入)。幸运的是,Kafka 现在提供了一种存储消费者偏移量的理想机制。消费者可以通过将偏移量写入持久(复制)和高可用性主题来提交他们在 Kafka 中的偏移量。消费者可以通过读取这个主题来获取偏移量(尽管我们提供了一个内存中的偏移量缓存以便更快地访问)。即,偏移提交是常规的生产者请求(成本低廉),而偏移提取是快速内存查找。
Kafka 官方文档描述了该功能的工作原理以及如何将偏移量从 ZooKeeper 迁移到 Kafka。这个 wiki 提供了示例代码,展示了如何使用新的基于 Kafka 的偏移存储机制。
try {
BlockingChannel channel = new BlockingChannel("localhost", 9092,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
final String MY_GROUP = "demoGroup";
final String MY_CLIENTID = "demoClientId";
int correlationId = 0;
final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
Broker offsetManager = metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
} else {
// retry (after backoff)
}
}
catch (IOException e) {
// retry the query (after backoff)
}