0

我正在运行一个风暴三叉戟拓扑,在两个不同的流中有两个不同的喷口。我的 spout 是 JMS spout 并使用 HDFS State 来持久化元组。

如果我只运行一个 spout,它工作正常,我将所有记录发布到 HDFS 中的 JMS 队列。

在运行具有连接到两个不同队列的两个 spout 的拓扑时,与我在 QUEUE 中发布的记录相比,我得到的记录更少。我在这里做错了什么吗。如果我这样做的方式有任何问题,请告诉我。

    TridentTopology topology = new TridentTopology();       
    topology.newStream("QueueSpout", TridentSpout).partitionPersist(tradeStateFactory,hdfsFields, new HdfsUpdater());       

    Stream TopicStream = topology.newStream("TopicSpout", TridentTopicSpout);
    TopicStream.each(hdfsFields, new CashFilter()).partitionPersist(cashStateFactory, hdfsFields, new HdfsUpdater());
    TopicStream.each(hdfsFields, new JournalFilter()).partitionPersist(journalStateFactory, hdfsFields, new HdfsUpdater());
    TopicStream.each(hdfsFields, new RcvdlvrFilter()).partitionPersist(rcvdlvrStateFactory, hdfsFields, new HdfsUpdater());
4

1 回答 1

0

以下配置适用于拓扑。

这是因为我没有使用分区分组。

使用全局和批处理全局后它工作正常。

shuffle:使用随机循环算法在所有目标分区中均匀地重新分配元组

广播:每个元组都被复制到所有目标分区。这在 DRPC 期间很有用——例如,如果您需要对每个数据分区执行 stateQuery。

partitionBy:partitionBy 接受一组字段并基于该组字段进行语义分区。这些字段根据目标分区的数量进行散列和修改以选择目标分区。partitionBy 保证相同的字段集总是进入相同的目标分区。

global:所有元组都发送到同一个分区。为流中的所有批次选择相同的分区。

batchGlobal:批处理中的所有元组都发送到同一个分区。流中的不同批次可能会进入不同的分区。

partition:此方法接受一个自定义分区函数,该函数实现了 backtype.storm.grouping.CustomStreamGrouping

下面的拓扑配置可以根据需要正常工作。

    TridentTopology topology = new TridentTopology();       
    topology.newStream("QueueSpout", TridentSpout).batchGlobal().partitionPersist(tradeStateFactory,hdfsFields, new HdfsUpdater());     

    Stream TopicStream = topology.newStream("TopicSpout",   TridentTopicSpout).global();
    TopicStream.each(hdfsFields, new CashFilter()).partitionPersist(cashStateFactory, hdfsFields, new HdfsUpdater());
    TopicStream.each(hdfsFields, new JournalFilter()).partitionPersist(journalStateFactory, hdfsFields, new HdfsUpdater());
    TopicStream.each(hdfsFields, new RcvdlvrFilter()).partitionPersist(rcvdlvrStateFactory, hdfsFields, new HdfsUpdater());
于 2015-11-13T21:59:34.843 回答