考虑以下代码:
public static long Offset = 0L;
FetchRequest req = new FetchRequest(KafkaProperties.topic, 0, Offset,10485760);
ByteBufferMessageSet messageSet = simpleConsumer.fetch(req);
问题是如何获取最后一个偏移量并设置回变量Offset
以从 Kafka 读取下一批数据?
更新: 当我打印数据时,即:
for (MessageAndOffset messageAndOffset : messageSet) {
System.out.println(messageAndOffset);
}
输出如下:
MessageAndOffset(message(magic = 1, attributes = 0, crc = 2000130375, payload = java.nio.HeapByteBuffer[pos=0 lim=176 cap=176]),296215)
MessageAndOffset(message(magic = 1, attributes = 0, crc = 956398356, payload = java.nio.HeapByteBuffer[pos=0 lim=196 cap=196]),298144)
....
....
MessageAndOffset(message(magic = 1, attributes = 0, crc = 396743887, payload = java.nio.HeapByteBuffer[pos=0 lim=179 cap=179]),299136)
文档说最后一个数字是偏移量
MessageAndOffset(message: Message, offset: Long)
也就是说,在上述情况下,我最后一次读取的偏移量将是299136