我已经开始我的生产者向 Kafka 发送数据,也开始我的消费者提取相同的数据。当我在 Apache Nifi 中使用 Consumekafka 处理器(kafka 版本 1.0)时,我脑海中几乎没有与 Kafka 消费者相关的查询。
Q.1) 当我第一次启动我的 ConsumeKafka 处理器时,我如何从开始和当前消息中读取消息?
Q.2)如果Kafka中的消费者关闭,如何在最后一条消费消息之后读取消息?
我们如何在使用 Apache Nifi 时实现以上两个?
我已经开始我的生产者向 Kafka 发送数据,也开始我的消费者提取相同的数据。当我在 Apache Nifi 中使用 Consumekafka 处理器(kafka 版本 1.0)时,我脑海中几乎没有与 Kafka 消费者相关的查询。
Q.1) 当我第一次启动我的 ConsumeKafka 处理器时,我如何从开始和当前消息中读取消息?
Q.2)如果Kafka中的消费者关闭,如何在最后一条消费消息之后读取消息?
我们如何在使用 Apache Nifi 时实现以上两个?
ConsumeKafka 处理器有一个名为“Offset Reset”的属性,当消费者组 id 没有先前的偏移量或偏移量不再存在时,将使用该属性。此属性的选项是“偏移最新”或“偏移最早”,默认为最新。
因此,如果您使用以前从未使用过的消费者组 id 启动 ConsumeKafka 处理器,那么它会从最新消息开始消费。之后,如果您启动和停止处理器,它将从上次消耗的偏移量开始。
如果您想再次使用“偏移重置”来强制它为最早或最新,那么您需要更改消费者组 ID,因为否则现有的消费者组将始终使用现有的偏移量开始。
您不能同时从头和当前阅读消息,您可以从头开始并一直阅读到当前,或者从当前开始。这是 Kafka 的工作方式,并不特定于 NiFi。