我已经设置了从 kafka 服务器获取输入数据的风暴拓扑。我使用 kafka-storm 包来获取数据。我已经在本地集群中成功地实现了 kafka 服务器和风暴拓扑之间的连接,但是我在从 kafka 服务器检索数据时遇到了一些问题。
kafka Spout 在运行时重复检索相同的消息,即使我设置spoutconfig.forceFromStart=false
和spoutconfig.startOffsetTime =-1
注意:当我停止并重新启动集群时,数据会根据最新的偏移量正确发送。
我已经设置了从 kafka 服务器获取输入数据的风暴拓扑。我使用 kafka-storm 包来获取数据。我已经在本地集群中成功地实现了 kafka 服务器和风暴拓扑之间的连接,但是我在从 kafka 服务器检索数据时遇到了一些问题。
kafka Spout 在运行时重复检索相同的消息,即使我设置spoutconfig.forceFromStart=false
和spoutconfig.startOffsetTime =-1
注意:当我停止并重新启动集群时,数据会根据最新的偏移量正确发送。