我使用以下方法。(我的 JOIN 方法非常相似,但这种方法不会复制 diff 与复制行的行为)。正如前段时间所问的那样,也许您只使用了一个减速器,因为 Pig有一个算法可以将减速器的数量调整为 0.8?
- 我使用的两种方法在性能上都相差几个百分点,但不要对重复项一视同仁
- JOIN 方法折叠重复项(因此,如果一个文件的重复项比另一个文件多,则此方法不会输出重复项)
- UNION 方法的工作方式类似于 Unix
diff
(1) 工具,并且将为正确的文件返回正确数量的额外重复项
- 与 Unix
diff
(1) 工具不同,顺序并不重要(实际上 JOIN 方法执行sort -u <foo.txt> | diff
而 UNION 执行sort <foo> | diff)
- 如果您有令人难以置信的(〜数千)数量的重复行,那么由于连接,事情会变慢(如果您的使用允许,请先对原始数据执行 DISTINCT)
- 如果您的行很长(例如 >1KB 大小),那么建议使用DataFu MD5 UDF 并且仅在哈希上有所不同,然后与您的原始文件 JOIN 以在输出之前取回原始行
使用加入:
SET job.name 'Diff(1) Via Join'
-- Erase Outputs
rmf first_only
rmf second_only
-- Process Inputs
a = LOAD 'a.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS First: chararray;
b = LOAD 'b.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Second: chararray;
-- Combine Data
combined = JOIN a BY First FULL OUTER, b BY Second;
-- Output Data
SPLIT combined INTO first_raw IF Second IS NULL,
second_raw IF First IS NULL;
first_only = FOREACH first_raw GENERATE First;
second_only = FOREACH second_raw GENERATE Second;
STORE first_only INTO 'first_only' USING PigStorage();
STORE second_only INTO 'second_only' USING PigStorage();
使用联合:
SET job.name 'Diff(1)'
-- Erase Outputs
rmf first_only
rmf second_only
-- Process Inputs
a_raw = LOAD 'a.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Row: chararray;
b_raw = LOAD 'b.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Row: chararray;
a_tagged = FOREACH a_raw GENERATE Row, (int)1 AS File;
b_tagged = FOREACH b_raw GENERATE Row, (int)2 AS File;
-- Combine Data
combined = UNION a_tagged, b_tagged;
c_group = GROUP combined BY Row;
-- Find Unique Lines
%declare NULL_BAG 'TOBAG(((chararray)\'place_holder\',(int)0))'
counts = FOREACH c_group {
firsts = FILTER combined BY File == 1;
seconds = FILTER combined BY File == 2;
GENERATE
FLATTEN(
(COUNT(firsts) - COUNT(seconds) == (long)0 ? $NULL_BAG :
(COUNT(firsts) - COUNT(seconds) > 0 ?
TOP((int)(COUNT(firsts) - COUNT(seconds)), 0, firsts) :
TOP((int)(COUNT(seconds) - COUNT(firsts)), 0, seconds))
)
) AS (Row, File); };
-- Output Data
SPLIT counts INTO first_only_raw IF File == 1,
second_only_raw IF File == 2;
first_only = FOREACH first_only_raw GENERATE Row;
second_only = FOREACH second_only_raw GENERATE Row;
STORE first_only INTO 'first_only' USING PigStorage();
STORE second_only INTO 'second_only' USING PigStorage();
表现
- 使用具有 18 个节点的 LZO 压缩输入来区分超过 200GB(1,055,687,930 行)大约需要 10 分钟。
- 每种方法只需要一个 Map/Reduce 周期。
- 这导致每个节点每分钟大约有 1.8GB 的差异(吞吐量不是很大,但在我的系统上,它似乎
diff
(1) 仅在内存中运行,而 Hadoop 利用流磁盘。