我是 Storm 的新手。我想使用一个名为 'tileClean' 的螺栓来发射单个Stream
,而其他五个螺栓同时接收Stream
。像这样:
流图
如您所见,“一、二、三、四、五”螺栓将同时收到相同的数据。但实际上,“一、二、三、四、五”螺栓无法接收任何数据。有我的代码:
@Override
public void execute(TupleWindow inputWindow) {
logger.debug("clean");
List<Tuple> tuples = inputWindow.get();
//logger.debug("clean phrase. tuple size is : {}", tuples.size());
for (Tuple input : tuples) {
// some other code..
//this._collector.emit(input, new Values(nal));
this._collector.emit("stream_id_one", input, new Values(nal));
this._collector.emit("stream_id_two", input, new Values(nal));
this._collector.emit("stream_id_three", input, new Values(nal));
this._collector.emit("stream_id_four", input, new Values(nal));
this._collector.emit("stream_id_five", input, new Values(nal));
this._collector.ack(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(BoltConstant.EMIT_LOGOBJ));
declarer.declareStream("stream_id_one", new Fields(BoltConstant.EMIT_LOGOBJ));
declarer.declareStream("stream_id_two", new Fields(BoltConstant.EMIT_LOGOBJ));
declarer.declareStream("stream_id_three", new Fields(BoltConstant.EMIT_LOGOBJ));
declarer.declareStream("stream_id_four", new Fields(BoltConstant.EMIT_LOGOBJ));
declarer.declareStream("stream_id_five", new Fields(BoltConstant.EMIT_LOGOBJ));
}
拓扑集是:
builder.setBolt("tileClean", cleanBolt, 1).shuffleGrouping("assembly");
builder.setBolt("OneBolt", OneBolt, 1).shuffleGrouping("tileClean", "stream_id_one");
builder.setBolt("TwoBolt", TwoBolt, 1).shuffleGrouping("tileClean", "stream_id_two");
builder.setBolt("ThreeBolt", ThreeBolt, 1).shuffleGrouping("tileClean", "stream_id_three");
builder.setBolt("FourBolt", FourBolt, 1).shuffleGrouping("tileClean", "stream_id_four");
builder.setBolt("FiveBolt", FiveBolt, 1).shuffleGrouping("tileClean", "stream_id_five");
tileClean
可以接收从 发射的元组assymble
,但其他螺栓不能接收。
我的代码有什么不正确的吗?