2

正如这里提到的简单消费者

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

另请注意,我们正在明确检查正在读取的偏移量是否不小于我们请求的偏移量。这是必需的,因为如果 Kafka 正在压缩消息,即使请求的偏移量不是压缩块的开头,提取请求也会返回整个压缩块。因此,我们之前看到的消息可能会再次返回。

最后,我们跟踪阅读的消息数。如果我们在最后一个请求中没有读取任何内容,我们会睡一秒钟,这样我们就不会在没有数据的时候敲打 Kafka。

就像在我的程序中一样,它首先读取一条旧消息,因为它是旧的而进入睡眠状态,然后读取新记录。

有什么办法让 SimpleConsumer 只读取新消息吗?

4

1 回答 1

0

来自同一页面

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}

它说要找到要读取的偏移量

Kafka 包含两个常量来提供帮助,kafka.api.OffsetRequest.EarliestTime() 在日志中找到数据的开头并从那里开始流式传输,kafka.api.OffsetRequest.LatestTime() 只会流式传输新消息。不要假设偏移量 0 是开始偏移量,因为消息会随着时间的推移而超出日志。

于 2013-08-23T22:27:33.330 回答