1

我正在尝试编写一个具有多个螺栓的 TridentTopology。现在我想让一个螺栓注册到其他螺栓特定的流,如下所示。

TridentTopologyBuilder tridentTopologyBuilder = new TridentTopologyBuilder();
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
                                            new Values("the cow jumped over the moon"),
                                            new Values("the man went to the store and bought some candy"),
                                            new Values("four score and seven years ago"),
                                            new Values("how many apples can you eat"));

tridentTopologyBuilder.setSpout("tridentSpout", "spoutStream", "spoutId", spout, 2, "spoutBatch");
Map<String, String> batchGroups = new HashMap<>();
batchGroups.put("boltStream", "boltBatch");
tridentTopologyBuilder.setBolt("tridentBolt", new TridentTestBolt(), 1, Sets.newHashSet("spoutBatch"), batchGroups).shuffleGrouping("tridentSpout", "spoutStream");

tridentTopologyBuilder.setBolt("tridentBolt2", new TridentTestBolt2(), 1, new HashSet<>(), batchGroups).shuffleGrouping("tridentBolt", "boltStream");

LocalCluster cluster = new LocalCluster();
Config config = new Config();
config.setDebug(true);
cluster.submitTopology("TridentTopology", config, tridentTopologyBuilder.buildTopology(new HashMap<>()));

我收到以下异常:

Error: InvalidTopologyException(msg:Component: [tridentBolt2] subscribes from non-existent stream: [$coord-boltBatch] of component [tridentBolt])

还使用 OutputFieldsDeclarer 的 declareStream 方法声明流

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declareStream("boltStream", new Fields("sentence"));
}

注册到其他螺栓特定流也可以在正常拓扑中工作。问题在于 Trident 拓扑。还有我们应该通过什么batchGroups

4

0 回答 0