如果我在我的 Storm 拓扑中增加 Kafka Spout 的并行度,我怎样才能阻止它多次读取同一主题中的同一消息?
5 回答
如果是从kafka流入storm,那么请分享更多信息。
如果数据流从storm到kafka:
然后只需在您的代码中检查您的 TopologyBuilder。
它不应该是allGrouping,如果是则将其更改为shuffleGrouping
例子:
builder.setBolt("OUTPUTBOLT", new OutBoundBolt(boltConfig), 4)
.allGrouping("previous_bolt"); // this is wrong change it to
// shuffleGrouping
所有分组:流在所有螺栓的任务中复制。请谨慎使用此分组。
您应该检查消息是否得到正确确认。如果不是,则 spout 会将其视为失败并回复消息。
您需要指定消费者组。一旦指定,Kafka 只会将下一条消息发送给您的任何 spout。所有的 spout 都应该属于同一个消费者组。
在创建消费者时,请指定以下属性
props.put("group.id", a_groupId);
Storm 的 Kafka spout 将消费者偏移量持久保存到 Zookeeper,因此只要您不清除 Zookeeper 存储,它就不应多次读取同一消息。如果您看到一条消息被多次读取,也许检查偏移量是否被持久化到您的 zookeeper 实例?
我认为默认情况下,在本地运行时,Kafka spout 会启动自己的本地 Zookeeper 实例(与 Kafka 的 Zookeeper 分开),每次重新启动拓扑时,它的状态可能会重置。
如果你的 kafka spout 是 Opeque 那么你需要 topology.max.spout.pending<10 因为“挂起意味着元组还没有被确认或失败”所以,如果每个批次没有更多的元组并且少于挂起的计数, spout 试图达到最大 spout 挂起大小。
如果您的需要满足,您可以使用 Transactional Spout 来处理这个问题。