4

我正在使用 kafka spout 来消费消息。但是如果我必须更改拓扑并上传,那么它将从旧消息恢复还是从新消息开始?Kafka spout 让我们指定从哪里消费的时间戳,但我怎么知道时间戳?

4

3 回答 3

2

spoutConfig.forceStartOffsetTime(-1);

它将选择围绕该时间戳写入的最新偏移量来开始消费。您可以通过传入 -1 强制 spout 始终从最新的偏移量开始,也可以通过传入 -2 强制它从最早的偏移量开始。

参考

于 2013-12-20T06:50:21.580 回答
1

如果您使用的是 KafkaSpout,请确保以下内容:

  1. 在您的 SpoutConfig 中,“id”和“zkroot”在重新部署新版本的拓扑后不要更改。Storm 使用“zkroot”、“id”将主题偏移量存储到 Zookeeper 中
  2. KafkaConfig.forceFromStart 设置为 false。

KafkaSpout 将偏移量存储到 Zookeeper 中。如果在 KafkaSpout 的 KafkaConfig 中将 forceFromStart 设置为 true(首次部署拓扑时可能会出现这种情况),则在重新部署期间要非常小心,它将忽略存储的 zookeeper 偏移量。确保将其设置为 false。

考虑编写拓扑,以便在执行拓扑的 main() 方法时从属性文件中读取 KafkaConfig.forceFromStart 值。这将允许您的管理员控制是否重播 Kafka 消息。

于 2015-04-14T12:25:04.793 回答
0

基本上,事件的顺序将是:

  1. 第一次通过从以下属性开始读取来启动拓扑:

    forceFromStart = true
    
    startOffsetTime = -2
    

上面的道具会强制它从话题的开头开始。记住要同时拥有这两个属性,因为forceFromStart告诉storm读取startOffsetTime属性并使用设置的值来确定从哪里开始读取,并忽略zookeeper偏移量。

从现在开始,您的拓扑将运行,并且 zookeeper 将保持偏移量。如果您的工作人员死亡,它将由主管启动并开始从 zookeeper 中的偏移量读取。

  1. 现在,如果您想重新启动拓扑,并且想从关闭前停止的位置读取,请使用以下属性并重新启动拓扑:

    forceFromStart = false
    

通过上面的属性,你告诉storm不要读取startOffsetTime值,而是使用在关闭拓扑之前已经维护的zookeeper偏移量。

从现在开始,每次重新启动拓扑时,它都会从原来的位置读取。

  1. 如果要重新启动拓扑并从主题的头部/顶部读取,请使用以下属性并重新启动拓扑:

    forceFromStart = true
    
    startOffsetTime = -1
    

通过上述属性,您告诉storm忽略zookeeper偏移并从作为主题提示的最新偏移开始。

于 2016-08-12T03:22:54.117 回答