我有一系列 Pig 脚本,它们正在转换来自需要连接在一起的多个数据源的数亿条记录。在每个脚本快结束时,我达到了 JOIN 性能变得非常缓慢的地步。查看 Tez 视图中的 DAG,我看到它被分成相对较少的任务(通常 100-200 个),但每个任务都需要几个小时才能完成。任务描述显示它正在执行 HASH_JOIN。
有趣的是,我只有在 Tez 执行引擎上运行时才会遇到这个瓶颈。在 MapReduce 上,它仍然需要一段时间,但没有什么比我在 Tez 上的痛苦爬行更厉害了。但是,在 MapReduce 上运行是一个问题,因为我对 MapReduce 有一个问题,我在这里问了另一个问题。
这是我的代码示例(抱歉,我必须使代码非常通用才能发布在互联网上)。我想知道我能做些什么来消除这个瓶颈——指定并行性有帮助吗?我的方法有问题吗?
-- Incoming data:
-- A: hundreds of millions of rows, 19 fields
-- B: hundreds of millions of rows, 3 fields
-- C: hundreds of millions of rows, 5 fields
-- D: a few thousand rows, 5 fields
J = -- This reduces the size of A, but still probably in the hundreds of millions
FILTER A
BY qualifying == 1;
K = -- This is a one-to-one join that doesn't explode the number of rows in J
JOIN J BY Id
, B BY Id;
L =
FOREACH K
GENERATE J1 AS L1
, J2 AS L2
, J3 AS L3
, J4 AS L4
, J5 AS L5
, J6 AS L6
, J7 AS L7
, J8 AS L8
, B1 AS L9
, B2 AS L10
;
M = -- Reduces the size of C to around one hundred million rows
FILTER C
BY Code matches 'Code-.+';
M_WithYear =
FOREACH M
GENERATE *
, (int)REGEX_EXTRACT(Code, 'Code-.+-([0-9]+)', 1) AS year:int
;
SPLIT M_WithYear
INTO M_annual IF year <= (int)'$currentYear' -- roughly 75% of the data from M
, M_lifetime IF Code == 'Code-Lifetime'; -- roughly 25% of the data from M
-- Transformations for M_annual
N =
JOIN M_WithYear BY Id, D BY Id USING 'replicated';
O = -- This is where performance falls apart
JOIN N BY (Id, year, M7) -- M7 matches L7
, L BY (Id, year, L7);
P =
FOREACH O
GENERATE N1 AS P1
, N2 AS P2
, N3 AS P3
, N4 AS P4
, N5 AS P5
, N6 AS P6
, N7 AS P7
, N8 AS P8
, N9 AS P9
, L1 AS P10
, L2 AS P11
;
-- Transformations N-P above repeated for M_lifetime