我正在运行一个风暴三叉戟拓扑,在两个不同的流中有两个不同的喷口。我的 spout 是 JMS spout 并使用 HDFS State 来持久化元组。
如果我只运行一个 spout,它工作正常,我将所有记录发布到 HDFS 中的 JMS 队列。
在运行具有连接到两个不同队列的两个 spout 的拓扑时,与我在 QUEUE 中发布的记录相比,我得到的记录更少。我在这里做错了什么吗。如果我这样做的方式有任何问题,请告诉我。
TridentTopology topology = new TridentTopology();
topology.newStream("QueueSpout", TridentSpout).partitionPersist(tradeStateFactory,hdfsFields, new HdfsUpdater());
Stream TopicStream = topology.newStream("TopicSpout", TridentTopicSpout);
TopicStream.each(hdfsFields, new CashFilter()).partitionPersist(cashStateFactory, hdfsFields, new HdfsUpdater());
TopicStream.each(hdfsFields, new JournalFilter()).partitionPersist(journalStateFactory, hdfsFields, new HdfsUpdater());
TopicStream.each(hdfsFields, new RcvdlvrFilter()).partitionPersist(rcvdlvrStateFactory, hdfsFields, new HdfsUpdater());