我有一个 Pig 程序,我试图计算两个袋子之间的最小中心。为了让它工作,我发现我需要将袋子组合成一个数据集。整个操作需要很长时间。我想从 UDF 中的磁盘打开其中一个包,或者能够在不需要 COGROUP 的情况下将另一个关系传递到 UDF 中......
代码:
# **** Load files for iteration ****
register myudfs.jar;
wordcounts = LOAD 'input/wordcounts.txt' USING PigStorage('\t') AS (PatentNumber:chararray, word:chararray, frequency:double);
centerassignments = load 'input/centerassignments/part-*' USING PigStorage('\t') AS (PatentNumber: chararray, oldCenter: chararray, newCenter: chararray);
kcenters = LOAD 'input/kcenters/part-*' USING PigStorage('\t') AS (CenterID:chararray, word:chararray, frequency:double);
kcentersa1 = CROSS centerassignments, kcenters;
kcentersa = FOREACH kcentersa1 GENERATE centerassignments::PatentNumber as PatentNumber, kcenters::CenterID as CenterID, kcenters::word as word, kcenters::frequency as frequency;
#***** Assign to nearest k-mean *******
assignpre1 = COGROUP wordcounts by PatentNumber, kcentersa by PatentNumber;
assignwork2 = FOREACH assignpre1 GENERATE group as PatentNumber, myudfs.kmeans(wordcounts, kcentersa) as CenterID;
基本上我的问题是,对于每个专利,我需要传递子关系(字数,kcenters)。为了做到这一点,我先做一个交叉,然后按 PatentNumber 做一个 COGROUP,以便得到一组 PatentNumber、{wordcounts}、{kcenters}。如果我能想出一种方法来传递关系或从 UDF 中打开中心,那么我可以通过 PatentNumber 对字数进行 GROUP 并运行 myudfs.kmeans(wordcount),这希望在没有 CROSS/COGROUP 的情况下更快。
这是一项昂贵的操作。目前这需要大约 20 分钟,并且似乎会占用 CPU/RAM。我在想如果没有 CROSS,它可能会更有效。我不确定它会更快,所以我想尝试一下。
无论如何,看起来从 Pig 内部调用 Loading 函数需要一个 PigContext 对象,而我不是从 evalfunc 获得的。为了使用 hadoop 文件系统,我还需要一些初始对象,但我不知道如何获取。所以我的问题是如何从 PIG UDF 中的 hadoop 文件系统打开文件?我还通过 main 运行 UDF 进行调试。所以我需要在调试模式下从普通文件系统加载。
另一个更好的主意是如果有一种方法可以将关系传递到 UDF 而无需 CROSS/COGROUP。这将是理想的,特别是如果关系驻留在内存中.. 即能够执行 myudfs.kmeans(wordcounts, kcenters) 而无需使用 kcenters 的 CROSS/COGROUP ...
但基本思想是用 IO 换取 RAM/CPU 周期。
无论如何,任何帮助都将不胜感激,PIG UDF 的文档除了最简单的 UDF 之外并没有得到很好的记录,即使在 UDF 手册中也是如此。