2

我已经开始我的生产者向 Kafka 发送数据,也开始我的消费者提取相同的数据。当我在 Apache Nifi 中使用 Consumekafka 处理器(kafka 版本 1.0)时,我脑海中几乎没有与 Kafka 消费者相关的查询。

Q.1) 当我第一次启动我的 ConsumeKafka 处理器时,我如何从开始和当前消息中读取消息?

Q.2)如果Kafka中的消费者关闭,如何在最后一条消费消息之后读取消息?

我们如何在使用 Apache Nifi 时实现以上两个?

4

1 回答 1

1

ConsumeKafka 处理器有一个名为“Offset Reset”的属性,当消费者组 id 没有先前的偏移量或偏移量不再存在时,将使用该属性。此属性的选项是“偏移最新”或“偏移最早”,默认为最新。

因此,如果您使用以前从未使用过的消费者组 id 启动 ConsumeKafka 处理器,那么它会从最新消息开始消费。之后,如果您启动和停止处理器,它将从上次消耗的偏移量开始。

如果您想再次使用“偏移重置”来强制它为最早或最新,那么您需要更改消费者组 ID,因为否则现有的消费者组将始终使用现有的偏移量开始。

您不能同时从头和当前阅读消息,您可以从头开始并一直阅读到当前,或者从当前开始。这是 Kafka 的工作方式,并不特定于 NiFi。

于 2018-07-30T14:19:54.977 回答