0

当我在本地执行 Pig 脚本并指定本地 GeoIPASNum.dat 文件时,以下代码有效。但是,它在 MapReduce 分布式模式下运行时不起作用。我错过了什么?

猪工作

DEFINE AsnResolver AsnResolver('/hdfs/location/of/GeoIPASNum.dat');

loaded = LOAD 'log_file' Using PigStorage() AS (ip:chararray);

columned = FOREACH loaded GENERATE AsnResolver(ip);

STORE columned INTO 'output/' USING PigStorage();

AsnResolver.java

public class AsnResolver extends EvalFunc<String> {

    String ipAsnFile = null;

    @Override
    public String exec(Tuple input) throws IOException {
        try {
            LookupService lus = new LookupService(ipAsnFile,
                    LookupService.GEOIP_MEMORY_CACHE);
            return lus.getOrg((String) input.get(0));
        } catch (IOException e) {
        }

        return null;
    }

    public AsnResolver(String file) {
        ipAsnFile = file;
    }

    ...

}
4

2 回答 2

1

问题是您正在使用对 HDFS 路径的字符串引用,并且 LookupService 构造函数无法解析该文件。当您在本地运行它时,它可能会起作用,因为 LookupService 对本地 FS 中的文件没有问题。

覆盖 getCacheFiles 方法:

@Override
public List<String> getCacheFiles() {
    List<String> list = new ArrayList<String>(1);
    list.add(ipAsnFile + "#GeoIPASNum.dat");
    return list;
}

然后更改 LookupService 构造函数以使用分布式缓存对 "GeoIPASNum.dat" 的引用:

LookupService lus = new LookupService("GeoIPASNum.dat", LookupService.GEOIP_MEMORY_CACHE);
于 2013-10-06T03:01:32.667 回答
0

在 Pig 文档的此页面中搜索“分布式缓存”:http: //pig.apache.org/docs/r0.11.0/udf.html

它使用 getCacheFiles() 方法显示的示例应确保集群中的所有节点都可以访问该文件。

于 2013-10-04T22:38:05.510 回答