0

(当涉及到 kafka 和 kafkajs 时,完全是初学者,所以如果这是一个愚蠢的问题,我很抱歉)

我有一个问题,我们有一个保留 48 小时数据(数百万条记录)的主题;我想知道从这个主题中获取最后“20 分钟”数据然后还流式传输新消息的最佳方式。

此主题中的每条消息都是 JSON,并且具有自纪元 (UTC) 以来以 UNIX 毫秒为单位的时间戳。

性能显然是这里的一个问题

4

1 回答 1

1

Java 客户端中有一个工具可以通过时间戳来寻找偏移量。为此,KafkaJS 中有一个PR ,但似乎没有经过验证和合并。

我想node-rdkafka有。下面是一个例子(参考

consumer.offsetsForTimes(
    [ {topic: 'hi', partition: 0, offset: Date.now() - (20*60*1000) } ],
    timeout,
    console.log
);

当你得到偏移量时,你可以寻找它们并开始阅读。

于 2020-07-28T09:05:42.477 回答