10

我正在尝试使用最新的 kafka_2.10-0.8.2.1 使用低级 Consumer Java API 手动管理偏移量。为了验证我从 Kafka 提交/读取的偏移量是否正确,我使用了 kafka.tools.ConsumerOffsetChecker 工具。

这是我的主题/消费者组的输出示例:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group   elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group           Topic                          Pid Offset          logSize         Lag             Owner
elastic_search_group my_log_topic              0   5               29              24              none

  这是我对结果的解释:

Offset = 5 --> 这是我的“elastic_search_group”消费者的当前偏移量

logSize = 29 --> 这是最新的偏移量 - 下一条消息的偏移量将到达这个主题/分区

Lag = 24 --> 29-5 - 我的“elastic_search_group”消费者尚未处理多少消息

Pid - 分区 ID

Q1:这是正确的吗?

现在,我想从我的 Java 消费者那里获得相同的信息。在这里,我发现我必须使用两个不同的 API:

卡夫卡.javaapi。OffsetRequest获取最早和最新的偏移量,但是 kafka.javaapi。OffsetFetchRequest获取当前偏移量。

要获得最早(或最新)的偏移量,我会:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];

为了获得当前的偏移量,我必须使用完全不同的 API:

short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>(); 
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition); 
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();

Q2:对吗?为什么有两个不同的 API 来获取非常相似的信息?

Q3:我在这里使用哪个versionId和correlationId有关系吗?我虽然对于 pre-0.8.2.1 kafka 的 versionId 应该是 0,对于 0.8.2.1 及更高版本应该是 1 - 但似乎它也适用于 0 的 0.8.2.1 - 见下文?

因此,对于上述主题的示例状态,以及 ConsumerOffsetChecker 的上述输出,以下是我从 Java 代码中得到的信息:

当前偏移=5;最早偏移=29;最新偏移=29

'currentOffset' 似乎没问题,'latestOffset' 也是正确的,但 'earliestOffset' 呢?我希望它至少是'5'?

Q4:earlyOffset 怎么会比 currentOffset 高?我唯一的怀疑是,由于保留政策,可能来自该主题的消息被清除了……。还有其他可能发生的情况吗?

4

1 回答 1

11

我正在寻找在分区中发现滞后的方法。这涉及您已采取的相同步骤。到目前为止,无论我学到什么,我都可以给你答案。

  1. logSize 直接指向该特定分区中累积了多少消息。或者,它指定该分区中消息的最大偏移量。偏移量是最后一次成功消费消息的偏移量。所以滞后只是日志大小和偏移量之间的差异。
  2. 是的,它是正确的。到目前为止,这些是找到当前偏移量和最早或最新偏移量的仅有的两种方法
  3. 我不知道为什么需要指定 versionId。您可以使用kafka.api.OffsetRequest.CurrentVersion()获取 versionId。因此可以避免硬编码。您可以放心地将相关性 ID 假设为 0。
  4. 这很奇怪。当我使用 EarliestTime() 时,即使我当前的偏移量进一步增加,我也会将最早的偏移量设为 0。这意味着它是分区的开始。因此,当某些消息在未来某个时间过期时,这个最早的偏移量将是某个非零数字。现在,如果由于保留策略滞后而清除了消息,则应该更改。我不确定这种行为。可以确定的一种方法是,在注意到此类阅读并检查其日志后运行消费者。它应该显示像这样的行。

    2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: 重置请求的消耗偏移量:2:获取的偏移量 = 405952:消耗的偏移量 = 335372 到 335372 2015-06-09 18:49:15: : DEBUG :: PartitionTopicInfo:52 :: 重置请求的消费偏移量:2: 获取的偏移量 = 405952: 消费的偏移量 = 335373 到 335373

请注意,在上面的日志行中,获取的偏移量保持不变,而消耗的偏移量正在增加。最后它会以

2015-06-09 18:49:16 :: DEBUG :: PartitionTopicInfo:52 :: 重置请求的消耗偏移量:2:获取的偏移量 = 405952:消耗的偏移量 = 405952 到 405952

那么这意味着由于从 335372 到 405952 的日志保留策略偏移量已过期

于 2015-06-15T12:36:51.120 回答