我正在使用 zookeeper 从 kafka 获取数据。在这里,我总是从最后一个偏移点获取数据。有没有办法指定偏移时间来获取旧数据?
有一个选项 autooffset.reset。它接受最小或最大。有人可以解释什么是最小和最大的。autooffset.reset 可以帮助从旧偏移点而不是最新偏移点获取数据吗?
我正在使用 zookeeper 从 kafka 获取数据。在这里,我总是从最后一个偏移点获取数据。有没有办法指定偏移时间来获取旧数据?
有一个选项 autooffset.reset。它接受最小或最大。有人可以解释什么是最小和最大的。autooffset.reset 可以帮助从旧偏移点而不是最新偏移点获取数据吗?
消费者始终属于一个组,对于每个分区,Zookeeper 都会跟踪分区中该消费者组的进度。
要从头开始获取,您可以删除与 Hussain 所指的进度相关的所有数据
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
您还可以指定所需的分区偏移量,如 core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala 中所指定
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
但是偏移量不是时间索引的,但是您知道每个分区都是一个序列。
如果您的消息包含时间戳(并注意此时间戳与 Kafka 收到您的消息的那一刻无关),您可以尝试做一个索引器,尝试通过将偏移量增加 N 来逐步检索一个条目,并存储元组(主题 X,第 2 部分,偏移量 100,时间戳)某处。
当您想从指定的时间检索条目时,您可以对粗略索引应用二进制搜索,直到找到您想要的条目并从那里获取。
从Kafka文档中他们说“kafka.api.OffsetRequest.EarliestTime() 在日志中找到数据的开头并从那里开始流式传输,kafka.api.OffsetRequest.LatestTime() 只会流式传输新消息。不要假设偏移量 0 是开始偏移量,因为消息会随着时间的推移从日志中老化。”
在此处使用 SimpleConsumerExample:https ://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
这可能会有所帮助
请参阅有关 kafka config 的文档:http: //kafka.apache.org/08/configuration.html以查询偏移参数的最小值和最大值。
顺便说一句,在探索 kafka 时,我想知道如何为消费者重播所有消息。我的意思是,如果一个消费者组已经轮询了所有消息并且它想要重新获取这些消息。
可以实现的方式是从zookeeper中删除数据。使用 kafka.utils.ZkUtils 类删除 zookeeper 上的节点。下面是它的用法:
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
目前
Kafka FAQ 给出了这个问题的答案。
如何使用 OffsetRequest 准确获取某个时间戳的消息偏移量?
Kafka 允许按时间查询消息的偏移量,并且以段粒度进行。timestamp 参数是 unix 时间戳,按时间戳查询偏移量会返回不晚于给定时间戳附加的消息的最新可能偏移量。时间戳有 2 个特殊值 - 最新和最早。对于 unix 时间戳的任何其他值,Kafka 将获取不晚于给定时间戳创建的日志段的起始偏移量。由于这个原因,并且由于偏移请求仅以段粒度提供服务,因此对于较大的段大小,偏移获取请求返回不太准确的结果。
为了获得更准确的结果,您可以根据时间 (log.roll.ms) 而不是大小 (log.segment.bytes) 配置日志段大小。但是应该小心,因为这样做可能会由于频繁的日志段滚动而增加文件处理程序的数量。
未来的计划
Kafka 将为消息格式添加时间戳。参考
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
Kafka Protocol Doc 是使用 request/response/Offsets/Messages 的绝佳来源: https ://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol 您使用简单消费者示例正如以下代码演示状态的地方:
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000)
.build();
FetchResponse fetchResponse = simpleConsumer.fetch(req);
设置 readOffset 以开始初始偏移量。但是您需要检查最大偏移量,上述方法将根据 addFetch 方法的最后一个参数中的 FetchSize 提供有限的偏移量。
使用 KafkaConsumer,您可以使用 Seek、SeekToBeginning 和 SeekToEnd 在流中移动。
此外,如果没有提供分区,它将寻找所有当前分配的分区的第一个偏移量。
你试过这个吗?
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
它将打印出给定主题的所有消息,在本例中为“test”。
此链接的更多详细信息https://kafka.apache.org/quickstart