1

如果我在我的 Storm 拓扑中增加 Kafka Spout 的并行度,我怎样才能阻止它多次读取同一主题中的同一消息?

4

5 回答 5

0

如果是从kafka流入storm,那么请分享更多信息。

如果数据流从storm到kafka:

然后只需在您的代码中检查您的 TopologyBuilder。

不应该是allGrouping,如果是则将其更改为shuffleGrouping

例子:

    builder.setBolt("OUTPUTBOLT", new OutBoundBolt(boltConfig), 4)
            .allGrouping("previous_bolt"); // this is wrong change it to
                                            // shuffleGrouping

所有分组:流在所有螺栓的任务中复制。请谨慎使用此分组。

于 2015-03-22T00:51:56.540 回答
0

您应该检查消息是否得到正确确认。如果不是,则 spout 会将其视为失败并回复消息。

于 2015-02-20T10:38:46.960 回答
0

您需要指定消费者组。一旦指定,Kafka 只会将下一条消息发送给您的任何 spout。所有的 spout 都应该属于同一个消费者组。

在创建消费者时,请指定以下属性

props.put("group.id", a_groupId);

于 2015-03-24T14:11:52.483 回答
0

Storm 的 Kafka spout 将消费者偏移量持久保存到 Zookeeper,因此只要您不清除 Zookeeper 存储,它就不应多次读取同一消息。如果您看到一条消息被多次读取,也许检查偏移量是否被持久化到您的 zookeeper 实例?

我认为默认情况下,在本地运行时,Kafka spout 会启动自己的本地 Zookeeper 实例(与 Kafka 的 Zookeeper 分开),每次重新启动拓扑时,它的状态可能会重置。

于 2015-02-19T17:12:39.233 回答
0

如果你的 kafka spout 是 Opeque 那么你需要 topology.max.spout.pending<10 因为“挂起意味着元组还没有被确认或失败”所以,如果每个批次没有更多的元组并且少于挂起的计数, spout 试图达到最大 spout 挂起大小。

如果您的需要满足,您可以使用 Transactional Spout 来处理这个问题。

于 2017-04-20T12:57:19.600 回答