我正在尝试使用最新的 kafka_2.10-0.8.2.1 使用低级 Consumer Java API 手动管理偏移量。为了验证我从 Kafka 提交/读取的偏移量是否正确,我使用了 kafka.tools.ConsumerOffsetChecker 工具。
这是我的主题/消费者组的输出示例:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
这是我对结果的解释:
Offset = 5 --> 这是我的“elastic_search_group”消费者的当前偏移量
logSize = 29 --> 这是最新的偏移量 - 下一条消息的偏移量将到达这个主题/分区
Lag = 24 --> 29-5 - 我的“elastic_search_group”消费者尚未处理多少消息
Pid - 分区 ID
Q1:这是正确的吗?
现在,我想从我的 Java 消费者那里获得相同的信息。在这里,我发现我必须使用两个不同的 API:
卡夫卡.javaapi。OffsetRequest获取最早和最新的偏移量,但是 kafka.javaapi。OffsetFetchRequest获取当前偏移量。
要获得最早(或最新)的偏移量,我会:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
为了获得当前的偏移量,我必须使用完全不同的 API:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2:对吗?为什么有两个不同的 API 来获取非常相似的信息?
Q3:我在这里使用哪个versionId和correlationId有关系吗?我虽然对于 pre-0.8.2.1 kafka 的 versionId 应该是 0,对于 0.8.2.1 及更高版本应该是 1 - 但似乎它也适用于 0 的 0.8.2.1 - 见下文?
因此,对于上述主题的示例状态,以及 ConsumerOffsetChecker 的上述输出,以下是我从 Java 代码中得到的信息:
当前偏移=5;最早偏移=29;最新偏移=29
'currentOffset' 似乎没问题,'latestOffset' 也是正确的,但 'earliestOffset' 呢?我希望它至少是'5'?
Q4:earlyOffset 怎么会比 currentOffset 高?我唯一的怀疑是,由于保留政策,可能来自该主题的消息被清除了……。还有其他可能发生的情况吗?