我有一个 3 节点 kafka 集群设置。我正在使用storm来阅读来自kafka的消息。我系统中的每个主题都有 7 个分区。
现在我面临一个奇怪的问题。直到 3 天前,一切正常。但是,现在看来我的风暴拓扑无法专门从 2 个分区 - #1 和 #4 读取。
我试图深入研究这个问题,发现在我的 kafka 日志中,对于这两个分区,缺少一个偏移量,即在 5964511 之后,下一个偏移量是 5964513 而不是 5964512。
由于缺少偏移量,简单消费者无法继续进行下一个偏移量。我做错了什么还是一个已知的错误?
这种行为可能是什么原因?
我正在使用以下代码来读取有效偏移量的窗口:
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] validOffsets = response.offsets(topic, partition);
for (long validOffset : validOffsets) {
System.out.println(validOffset + " : ");
}
long largestOffset = validOffsets[0];
long smallestOffset = validOffsets[validOffsets.length - 1];
System.out.println(smallestOffset + " : " + largestOffset );
return largestOffset;
}
这给了我以下输出:
4529948 : 6000878
因此,我提供的偏移量正好在偏移范围内。