从抽象的角度来看,Apache Kafka 将数据存储在主题中。消费者可以读取此数据。
我想要一个(监视器)消费者,它可以对特定年龄的数据进行 greps。监视器应该向子系统发送一条警告,即记录仍未读取,如果它们达到保留时间,将被 Kafka 丢弃。
直到现在我都找不到合适的方法。
从抽象的角度来看,Apache Kafka 将数据存储在主题中。消费者可以读取此数据。
我想要一个(监视器)消费者,它可以对特定年龄的数据进行 greps。监视器应该向子系统发送一条警告,即记录仍未读取,如果它们达到保留时间,将被 Kafka 丢弃。
直到现在我都找不到合适的方法。
您可以使用KafkaConsumer.offsetsForTimes()
将消息映射到日期。
例如,如果您使用昨天的日期调用它并返回偏移量 X,那么任何偏移量小于 X 的消息都比昨天更旧。
然后,如果您面临丢弃未处理记录的风险,您的逻辑可以从消费者的当前位置中计算出来。
请注意,目前正在讨论一个 KIP 来公开指标以跟踪它:https ://cwiki.apache.org/confluence/display/KAFKA/KIP-223+-+Add+per-topic+min+lead+and+每个分区+lead+metrics+to+KafkaConsumer