0

我正在按照这里的教程使用分布式缓存。我对代码稍作改动,使其与 Hadoop2.2 兼容。

我发现loadStopWords调用该方法时,会抛出IO异常:

我确认 stop_words.txt 已复制到HDFS. 我省略了 mapper 和 reducer 代码以使其变得简单。

这是我的代码:

public static final String LOCAL_STOPWORD_LIST =
              "/Users/sridhar/Documents/hadoop/stop_words.txt";

    public static final String HDFS_STOPWORD_LIST = "/data/stop_words.txt";

    //copies local file to HDFS and adds to Job's cache file
    static  void cacheStopWordList(Configuration conf, Job job) throws IOException, URISyntaxException {
        FileSystem fs = FileSystem.get(conf);
        URI hdfsPath = new URI(HDFS_STOPWORD_LIST);

        System.out.println("coping files to HDFS");

        // upload the file to hdfs. Overwrite any existing copy.
        fs.copyFromLocalFile(false, true, new Path(LOCAL_STOPWORD_LIST),
            new Path(hdfsPath));

        System.out.println("done copying HDFS");
        job.addCacheFile(hdfsPath);
      }

    protected void setup(Context context) {
            try {
              String stopwordCacheName = new Path(HDFS_STOPWORD_LIST).toString();
              URI[] cacheFiles = context.getCacheFiles();

              System.out.println(Arrays.toString(cacheFiles));


              if (null != cacheFiles && cacheFiles.length > 0) {
                for (URI cacheURI : cacheFiles) {
                    System.out.println(cacheURI.toString());
                    System.out.println(stopwordCacheName);
                     System.out.println("-----------------");
                  if (cacheURI.toString().equals(stopwordCacheName)) {
                      System.out.println("****************************************");
                    loadStopWords(new Path(cacheURI)); // IT BREAKS HERE
                    System.out.println(stopWords);
                    break;
                  }
                }
              }
            } catch (IOException ioe) {
              System.err.println("IOException reading from distributed cache");
              System.err.println(ioe.toString());
            }
          }

        void loadStopWords(Path cachePath) throws IOException {
            // note use of regular java.io methods here - this is a local file now
            BufferedReader wordReader = new BufferedReader(
                new FileReader(cachePath.toString()));
            try {
              String line;
              this.stopWords = new HashSet<String>();
              while ((line = wordReader.readLine()) != null) {
                this.stopWords.add(line.toLowerCase());
              }
            } finally {
              wordReader.close();
            }
          }





public static void main(String[] args) throws IllegalArgumentException, IOException, InterruptedException, ClassNotFoundException, URISyntaxException {

Job job = new Job();
job.setJarByClass(LineIndexer.class);
job.setJobName("LineIndexer");
Configuration conf = job.getConfiguration();
cacheStopWordList(conf,job);
}
4

2 回答 2

0

我认为你应该尝试Path[] localPaths = context.getLocalCacheFiles();而不是context.getCacheFiles();让我知道它是否有效

于 2014-07-11T14:11:45.083 回答
-1

在您提供的链接中,提到使用 DistributedCache.addCacheFile()。这是para 要使用分布式缓存传播文件,请在设置作业时创建 DistributedCache 类的实例。使用 DistributedCache.addCacheFile() 方法添加应该发送到系统上所有节点的文件的名称。

而不是写

job.addCacheFile(hdfsPath);

尝试写作

DistributedCache.addCacheFile(hdfsPath, job.getConfiguration());
于 2014-07-08T11:55:32.653 回答