我已经实现了一个 Apache Pig 脚本。当我执行脚本时,它会为特定步骤生成许多映射器,但该步骤只有一个减速器。由于这种情况(许多映射器,一个减速器),Hadoop 集群在单个减速器执行时几乎是空闲的。为了更好地利用集群的资源,我还希望有许多并行运行的减速器。
即使我使用 SET DEFAULT_PARALLEL 命令在 Pig 脚本中设置并行度,我仍然会导致只有 1 个减速器。
发出问题的代码部分如下:
SET DEFAULT_PARALLEL 5;
inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int);
inputDataGrouped = GROUP inputData BY (group_name);
-- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)}
pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag;
pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);
'inputData' 和 'inputDataGrouped' 别名在映射器中计算。
reducer 中的“pairs”和“pairsFlat”。
如果我通过删除带有 FLATTEN 命令的行来更改脚本 (pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);) 那么执行会产生 5 个减速器(因此是并行执行) .
似乎 FLATTEN 命令是问题所在,并且避免了创建许多减速器。
我怎样才能达到与 FLATTEN 相同的结果,但让脚本并行执行(使用许多减速器)?
编辑:
有两个 FOREACH 时的 EXPLAIN 计划(如上):
Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32
| |
| Project[chararray][0] - scope-33
|
|---inputData: New For Each(false,false)[bag] - scope-29
| |
| Cast[chararray] - scope-24
| |
| |---Project[bytearray][0] - scope-23
| |
| Cast[int] - scope-27
| |
| |---Project[bytearray][1] - scope-26
|
|---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------
Reduce Plan
pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42
|
|---pairsFlat: New For Each(true)[bag] - scope-41
| |
| Project[bag][0] - scope-39
|
|---pairs: New For Each(false)[bag] - scope-38
| |
| POUserFunc(GeneratePairsUDF)[bag] - scope-36
| |
| |---Project[bag][1] - scope-35
| |
| |---Project[bag][1] - scope-34
|
|---inputDataGrouped: Package[tuple]{chararray} - scope-31--------
Global sort: false
当只有一个 FOREACH 和 FLATTEN 包裹 UDF 时,请解释计划:
Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29
| |
| Project[chararray][0] - scope-30
|
|---inputData: New For Each(false,false)[bag] - scope-26
| |
| Cast[chararray] - scope-21
| |
| |---Project[bytearray][0] - scope-20
| |
| Cast[int] - scope-24
| |
| |---Project[bytearray][1] - scope-23
|
|---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------
Reduce Plan
pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---pairs: New For Each(true)[bag] - scope-35
| |
| POUserFunc(GeneratePairsUDF)[bag] - scope-33
| |
| |---Project[bag][1] - scope-32
| |
| |---Project[bag][1] - scope-31
|
|---inputDataGrouped: Package[tuple]{chararray} - scope-28--------
Global sort: false