我改变了类WordCount
如下WordCountTopology
:
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if(count==null) count = 0;
count++;
counts.put(word, count);
OutputStream o;
try {
o = new FileOutputStream("~/abc.txt", true);
o.write(word.getBytes());
o.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
我在其中将单词写入文件abc.txt
。
当我WordCountTopology
在本地模式下运行(使用LocalCluster
)时,它工作得很好。但是在分布式模式下运行时(使用该StormSubmitter.submitTopology()
方法),WordCount
该类没有将单词写入,abc.txt
就好像该execute()
方法根本没有运行一样。谁能给我一些想法?非常感谢!
PS我确定我的nimbus、supervisor、ui、zookeeper运行正常,在127.0.0.1:8080可以看到任务。