1

这是“ zookeeper 在哪里存储 Kafka 集群和相关信息? ”的后续问题,基于 Armando Ballaci 提供的答案。

现在很明显,消费者偏移量存储在 Kafka 集群中一个名为__consumer_offsets. 没关系,我只是想知道这些偏移量的检索是如何工作的。

主题不像 RDBS,我们可以基于某个谓词查询任意数据。例如 - 如果数据存储在 RDBMS 中,可能像下面这样的查询将获取某个消费者组的特定消费者的主题的特定分区的消费者偏移量。

select consumer_offset__read, consumer_offset__commited from consumer_offset_table where consumer-grp-id="x" and partitionid="y"

但显然这种检索在 Kafka Topics 上是不可能的。那么从主题中检索机制是如何工作的呢?有人可以详细说明吗?

(来自 Kafka 分区的数据在 FIFO 中读取,如果遵循 Kafka 消费者模型来检索特定偏移量,则必须处理大量额外数据并且速度会很慢。所以我想知道它是否以其他方式完成。 ..)

4

2 回答 2

2

当我在日常工作中偶然发现这个时,我可以在网上找到一些关于相同的描述如下:

在 Kafka 到 0.8.1.1 的版本中,消费者将他们的偏移量提交给 ZooKeeper。当存在大量偏移量(即,消费者计数 * 分区计数)时,ZooKeeper 不能很好地扩展(尤其是对于写入)。幸运的是,Kafka 现在提供了一种存储消费者偏移量的理想机制。消费者可以通过将偏移量写入持久(复制)和高可用性主题来提交他们在 Kafka 中的偏移量。消费者可以通过读取这个主题来获取偏移量(尽管我们提供了一个内存中的偏移量缓存以便更快地访问)。即,偏移提交是常规的生产者请求(成本低廉),而偏移提取是快速内存查找。

Kafka 官方文档描述了该功能的工作原理以及如何将偏移量从 ZooKeeper 迁移到 Kafka。这个 wiki 提供了示例代码,展示了如何使用新的基于 Kafka 的偏移存储机制。

try {
        BlockingChannel channel = new BlockingChannel("localhost", 9092,
                BlockingChannel.UseDefaultBufferSize(),
                BlockingChannel.UseDefaultBufferSize(),
                5000 /* read timeout in millis */);
        channel.connect();
        final String MY_GROUP = "demoGroup";
        final String MY_CLIENTID = "demoClientId";
        int correlationId = 0;
        final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
        final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
        channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
        ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
 
        if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
            Broker offsetManager = metadataResponse.coordinator();
            // if the coordinator is different, from the above channel's host then reconnect
            channel.disconnect();
            channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          5000 /* read timeout in millis */);
            channel.connect();
        } else {
            // retry (after backoff)
        }
    }
    catch (IOException e) {
        // retry the query (after backoff)
    }
于 2020-07-05T10:32:45.943 回答
1

在 Kafka 到 0.8.1.1 的版本中,消费者将他们的偏移量提交给 ZooKeeper。当存在大量偏移量(即,消费者计数 * 分区计数)时,ZooKeeper 不能很好地扩展(尤其是对于写入)。幸运的是,Kafka 现在提供了一种存储消费者偏移量的理想机制。消费者可以通过将偏移量写入持久(复制)和高可用性主题来提交他们在 Kafka 中的偏移量。消费者可以通过读取这个主题来获取偏移量(尽管我们提供了一个内存中的偏移量缓存以便更快地访问)。即,偏移提交是常规的生产者请求(成本低廉),而偏移提取是快速内存查找。

Kafka 官方文档描述了该功能的工作原理以及如何将偏移量从 ZooKeeper 迁移到 Kafka。

这个想法是,如果您需要您描述的此类功能,则需要将数据存储在 RDBS 或 NoSQL 数据库或 ELK Stack 中。一个好的模式是使用 Sink 连接器通过 Kafka Connect。Kafka 中的正常消息处理是通过消费者或流定义完成的,这些定义在事件发生时对其做出反应。在某些情况下,您当然可以寻求抵消或时间戳,这是完全可能的......

在最新版本的 Kafka 中,偏移量不再保存在 Zookeeper 中。所以 Zookeeper 不参与消费者偏移处理。

于 2020-07-05T10:34:57.623 回答