我知道必须有办法做到这一点,但我无法弄清楚这一点。读取队列中的所有消息后,我需要停止 kafka 消费者。
有人可以提供这方面的任何信息吗?
我知道必须有办法做到这一点,但我无法弄清楚这一点。读取队列中的所有消息后,我需要停止 kafka 消费者。
有人可以提供这方面的任何信息吗?
您可以在启动消费者时传递参数:-consumer-timeout-ms 和一个值,如果在此期间没有读取任何消息,它将引发异常。例如,如果最近 2 秒内没有新消息到达,则停止消费者: kafka.consumer.ConsoleConsumer -consumer-timeout-ms 2000
您可以在此处查看此选项和所有其他输入选项
目前,Kafka 版本 2.11-2.1.1 有一个名为kafka-console-consumer.sh
.
它有一个新标志:--timeout-ms
.
基本上,这个标志是在没有新日志等待时退出前等待的最长时间。它以毫秒为单位。
您可以在阅读所有消息后使用此属性来结束您的控制台使用者。
您可以将 SimpleConsumerShell 与 no-wait-at-logend 选项一起使用。见SystemTools-SimpleConsumerShell
例如:
./kafka-run-class.bat kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic kafkademo --partition 0 --no-wait-at-logend
如果您对使用 Scala 客户端没有死心,请尝试使用kafkacat选项-e
告诉它在达到分区结束时退出。
例如消费来自 mytopic 分区 2 的所有消息,然后退出:
$ kafkacat -b mybroker -t mytopic -p 2 -o beginning -e
或者消费最后 3000 条消息然后退出:
$ kafkacat -b mybroker -t mytopic -p 2 -o -3000 -e