5

我改变了类WordCount如下WordCountTopology

public static class WordCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if(count==null) count = 0;
        count++;
        counts.put(word, count);
        OutputStream o;
        try {
            o = new FileOutputStream("~/abc.txt", true);
            o.write(word.getBytes());
            o.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

我在其中将单词写入文件abc.txt

当我WordCountTopology在本地模式下运行(使用LocalCluster)时,它工作得很好。但是在分布式模式下运行时(使用该StormSubmitter.submitTopology()方法),WordCount该类没有将单词写入,abc.txt就好像该execute()方法根本没有运行一样。谁能给我一些想法?非常感谢!

PS我确定我的nimbus、supervisor、ui、zookeeper运行正常,在127.0.0.1:8080可以看到任务。

4

1 回答 1

2

主要问题是 abc.txt 文件的位置。该文件将在您提交拓扑的系统中创建。因此该文件在其他集群机器中不可用。您可以检查主管日志中的文件未找到错误。到解决这个问题,您需要一些 NFS 配置,通过该配置,所有集群机器可以共享公共位置。配置 NFS 后,在公共位置创建新文件,以便所有主管都可以使用该文件。

于 2014-01-15T12:16:47.423 回答