我正在使用 kafka spout 来消费消息。但是如果我必须更改拓扑并上传,那么它将从旧消息恢复还是从新消息开始?Kafka spout 让我们指定从哪里消费的时间戳,但我怎么知道时间戳?
3 回答
spoutConfig.forceStartOffsetTime(-1);
它将选择围绕该时间戳写入的最新偏移量来开始消费。您可以通过传入 -1 强制 spout 始终从最新的偏移量开始,也可以通过传入 -2 强制它从最早的偏移量开始。
如果您使用的是 KafkaSpout,请确保以下内容:
- 在您的 SpoutConfig 中,“id”和“zkroot”在重新部署新版本的拓扑后不要更改。Storm 使用“zkroot”、“id”将主题偏移量存储到 Zookeeper 中
- KafkaConfig.forceFromStart 设置为 false。
KafkaSpout 将偏移量存储到 Zookeeper 中。如果在 KafkaSpout 的 KafkaConfig 中将 forceFromStart 设置为 true(首次部署拓扑时可能会出现这种情况),则在重新部署期间要非常小心,它将忽略存储的 zookeeper 偏移量。确保将其设置为 false。
考虑编写拓扑,以便在执行拓扑的 main() 方法时从属性文件中读取 KafkaConfig.forceFromStart 值。这将允许您的管理员控制是否重播 Kafka 消息。
基本上,事件的顺序将是:
第一次通过从以下属性开始读取来启动拓扑:
forceFromStart = true startOffsetTime = -2
上面的道具会强制它从话题的开头开始。记住要同时拥有这两个属性,因为forceFromStart
告诉storm读取startOffsetTime
属性并使用设置的值来确定从哪里开始读取,并忽略zookeeper偏移量。
从现在开始,您的拓扑将运行,并且 zookeeper 将保持偏移量。如果您的工作人员死亡,它将由主管启动并开始从 zookeeper 中的偏移量读取。
现在,如果您想重新启动拓扑,并且想从关闭前停止的位置读取,请使用以下属性并重新启动拓扑:
forceFromStart = false
通过上面的属性,你告诉storm不要读取startOffsetTime
值,而是使用在关闭拓扑之前已经维护的zookeeper偏移量。
从现在开始,每次重新启动拓扑时,它都会从原来的位置读取。
如果要重新启动拓扑并从主题的头部/顶部读取,请使用以下属性并重新启动拓扑:
forceFromStart = true startOffsetTime = -1
通过上述属性,您告诉storm忽略zookeeper偏移并从作为主题提示的最新偏移开始。