我编写了 Kafka Streaming 应用程序,它只根据某些条件过滤行并将其加载到 MongoDB。
流式处理工作正常,但由于我的代码存在一些缺陷,我想再次重新处理整个数据。
一种方法是杀死流媒体应用程序,更改消费者组 id,从 mongo 中删除数据并重新运行应用程序。
如何在不更改消费者组 ID 的情况下实现此场景。
<<我使用的是Kafka 0.10版本>>
非常感谢帕里
我编写了 Kafka Streaming 应用程序,它只根据某些条件过滤行并将其加载到 MongoDB。
流式处理工作正常,但由于我的代码存在一些缺陷,我想再次重新处理整个数据。
一种方法是杀死流媒体应用程序,更改消费者组 id,从 mongo 中删除数据并重新运行应用程序。
如何在不更改消费者组 ID 的情况下实现此场景。
<<我使用的是Kafka 0.10版本>>
非常感谢帕里
Apache Kafka 0.10.0.1(在 8 月发布,而最初的问题是在 7 月提出的)附带一个新的 Kafka Streams 应用程序重置工具,这是一个比简单重命名更简单、更好/更清洁的解决方案application.id
。
您可以通过脚本执行该工具,该脚本bin/kafka-streams-application-reset.sh
还将打印使用/帮助消息。
例子:
# Run this only after ALL application instances were stopped!
$ bin/kafka-streams-application-reset --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic \
--bootstrap-servers brokerHost:9092 \
--zookeeper zookeeperHost:2181
也就是说,我建议阅读前面提到的 Matthias J. Sax 所写的使用 Kafka Streams 进行数据重新处理:重置流应用程序以获取更多详细信息。那篇文章还解释了为什么简单地重命名application.id
(这是迄今为止的解决方法)不是最好的主意。
收到来自 Matthias J. Sax matthias@confluent.io 的更新 -
目前,更改应用程序 ID 是最好的方法。正确清理应用程序状态有点棘手。我们目前正在对此进行改进——应该很快就会推出。
见https://issues.apache.org/jira/browse/KAFKA-3185
干杯帕里