1

我已经设置了从 kafka 服务器获取输入数据的风暴拓扑。我使用 kafka-storm 包来获取数据。我已经在本地集群中成功地实现了 kafka 服务器和风暴拓扑之间的连接,但是我在从 kafka 服务器检索数据时遇到了一些问题。

kafka Spout 在运行时重复检索相同的消息,即使我设置spoutconfig.forceFromStart=falsespoutconfig.startOffsetTime =-1

注意:当我停止并重新启动集群时,数据会根据最新的偏移量正确发送。

4

1 回答 1

2

我自己想通了,问题在于outputcollector ack()方法。我已经用 实现了螺栓收集器BaseBasicBolt,它没有承认 kafkaspout。我已经替换BaseRichBoltthis.collector.ack(tuple)手动制作。

现在它的工作正常

于 2015-02-09T07:53:46.180 回答