5

我有一个 3 节点 kafka 集群设置。我正在使用storm来阅读来自kafka的消息。我系统中的每个主题都有 7 个分区。

现在我面临一个奇怪的问题。直到 3 天前,一切正常。但是,现在看来我的风暴拓扑无法专门从 2 个分区 - #1 和 #4 读取。

我试图深入研究这个问题,发现在我的 kafka 日志中,对于这两个分区,缺少一个偏移量,即在 5964511 之后,下一个偏移量是 5964513 而不是 5964512。

由于缺少偏移量,简单消费者无法继续进行下一个偏移量。我做错了什么还是一个已知的错误?

这种行为可能是什么原因?

我正在使用以下代码来读取有效偏移量的窗口:

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
    OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);
    long[] validOffsets = response.offsets(topic, partition);
    for (long validOffset : validOffsets) {
        System.out.println(validOffset + " : ");
    }
    long largestOffset = validOffsets[0];
    long smallestOffset = validOffsets[validOffsets.length - 1];
    System.out.println(smallestOffset + " : " + largestOffset );
    return largestOffset;
}

这给了我以下输出:

4529948 : 6000878

因此,我提供的偏移量正好在偏移范围内。

4

1 回答 1

1

抱歉回答晚了,但是...

我为这种情况编写了一个 Long 实例 var 来保存下一个要读取的偏移量,然后在 fetch 之后检查返回的 FetchResponse 是否有错误()。如果出现错误,我将下一个偏移值更改为一个合理的值(可能是下一个偏移或最后一个可用的偏移)并重试。

于 2014-12-15T17:25:03.050 回答