8

我想使用分布式缓存来允许我的映射器访问数据。主要是,我正在使用命令

DistributedCache.addCacheFile(new URI("/user/peter/cacheFile/testCache1"), conf);

其中 /user/peter/cacheFile/testCache1 是 hdfs 中存在的文件

然后,我的设置函数如下所示:

public void setup(Context context) throws IOException, InterruptedException{
    Configuration conf = context.getConfiguration();
    Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
    //etc
}

但是,此 localFiles 数组始终为空。

我最初在单主机集群上运行以进行测试,但我读到这将阻止分布式缓存工作。我尝试使用伪分布式,但这也不起作用

我正在使用 hadoop 1.0.3

谢谢彼得

4

4 回答 4

35

这里的问题是我正在执行以下操作:

Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
DistributedCache.addCacheFile(new URI("/user/peter/cacheFile/testCache1"), conf);

由于 Job 构造函数会生成 conf 实例的内部副本,因此之后添加缓存文件不会影响任何事情。相反,我应该这样做:

Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("/user/peter/cacheFile/testCache1"), conf);
Job job = new Job(conf, "wordcount");

现在它可以工作了。感谢 Harsh on hadoop 用户列表的帮助。

于 2012-12-09T02:27:46.527 回答
11
Configuration conf = new Configuration();  
Job job = new Job(conf, "wordcount");
DistributedCache.addCacheFile(new URI("/userpetercacheFiletestCache1"),job.getConfiguration());

你也可以这样做。

于 2013-02-11T09:53:51.780 回答
4

一旦作业被分配给一个配置对象,即Configuration conf = new Configuration();

Job job = new Job(conf, "wordcount");

然后如果处理 conf 的属性,如下所示,例如

conf.set("demiliter","|");

或者

DistributedCache.addCacheFile(new URI("/user/peter/cacheFile/testCache1"), conf);

此类更改不会反映在伪集群或集群中,无论它如何与本地环境一起工作。

于 2013-06-06T09:03:48.817 回答
2

这个版本的代码(与上述结构略有不同)一直对我有用。

//in main(String [] args)
Job job = new Job(conf,"Word Count"); 
...
DistributedCache.addCacheFile(new URI(/user/peter/cacheFile/testCache1), job.getConfiguration());

我没有在 Mapper 代码中看到完整的 setup() 函数

public void setup(Context context) throws IOException, InterruptedException {

    Configuration conf = context.getConfiguration();
    FileSystem fs = FileSystem.getLocal(conf);

    Path[] dataFile = DistributedCache.getLocalCacheFiles(conf);

    // [0] because we added just one file.
    BufferedReader cacheReader = new BufferedReader(new InputStreamReader(fs.open(dataFile[0])));
    // now one can use BufferedReader's readLine() to read data

}
于 2014-08-10T00:12:28.340 回答