35

我已经实现了一个 Apache Pig 脚本。当我执行脚本时,它会为特定步骤生成许多映射器,但该步骤只有一个减速器。由于这种情况(许多映射器,一个减速器),Hadoop 集群在单个减速器执行时几乎是空闲的。为了更好地利用集群的资源,我还希望有许多并行运行的减速器。

即使我使用 SET DEFAULT_PARALLEL 命令在 Pig 脚本中设置并行度,我仍然会导致只有 1 个减速器。

发出问题的代码部分如下:

SET DEFAULT_PARALLEL 5;
inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int);
inputDataGrouped = GROUP inputData BY (group_name);
-- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)}
pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag;
pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);

'inputData' 和 'inputDataGrouped' 别名在映射器中计算。

reducer 中的“pairs”和“pairsFlat”。

如果我通过删除带有 FLATTEN 命令的行来更改脚本 (pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (i​​tem1:int, item2:int);) 那么执行会产生 5 个减速器(因此是并行执行) .

似乎 FLATTEN 命令是问题所在,并且避免了创建许多减速器。

我怎样才能达到与 FLATTEN 相同的结果,但让脚本并行执行(使用许多减速器)?

编辑:

有两个 FOREACH 时的 EXPLAIN 计划(如上):

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32
|   |
|   Project[chararray][0] - scope-33
|
|---inputData: New For Each(false,false)[bag] - scope-29
    |   |
    |   Cast[chararray] - scope-24
    |   |
    |   |---Project[bytearray][0] - scope-23
    |   |
    |   Cast[int] - scope-27
    |   |
    |   |---Project[bytearray][1] - scope-26
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------


Reduce Plan
pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42
|
|---pairsFlat: New For Each(true)[bag] - scope-41
    |   |
    |   Project[bag][0] - scope-39
    |
    |---pairs: New For Each(false)[bag] - scope-38
        |   |
        |   POUserFunc(GeneratePairsUDF)[bag] - scope-36
        |   |
        |   |---Project[bag][1] - scope-35
        |       |
        |       |---Project[bag][1] - scope-34
        |
        |---inputDataGrouped: Package[tuple]{chararray} - scope-31--------
Global sort: false

当只有一个 FOREACH 和 FLATTEN 包裹 UDF 时,请解释计划:

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29
|   |
|   Project[chararray][0] - scope-30
|
|---inputData: New For Each(false,false)[bag] - scope-26
    |   |
    |   Cast[chararray] - scope-21
    |   |
    |   |---Project[bytearray][0] - scope-20
    |   |
    |   Cast[int] - scope-24
    |   |
    |   |---Project[bytearray][1] - scope-23
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------


Reduce Plan
pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---pairs: New For Each(true)[bag] - scope-35
    |   |
    |   POUserFunc(GeneratePairsUDF)[bag] - scope-33
    |   |
    |   |---Project[bag][1] - scope-32
    |       |
    |       |---Project[bag][1] - scope-31
    |
    |---inputDataGrouped: Package[tuple]{chararray} - scope-28--------
Global sort: false
4

4 回答 4

3

如果 pig 对 pig 脚本中的每个步骤都使用配置 DEFAULT_PARALLEL 值,则无法确定。尝试 PARALLEL 以及您觉得需要时间的特定加入/组步骤(在您的情况下为 GROUP 步骤)。

 inputDataGrouped = GROUP inputData BY (group_name) PARALLEL 67;

如果它仍然无法正常工作,那么您可能需要查看数据是否存在偏度问题。

于 2014-06-17T11:12:35.513 回答
1

我尝试了“设置默认并行”和“PARALLEL 100”,但没有运气。Pig 仍然使用 1 个减速器。

事实证明,我必须为每条记录生成一个从 1 到 100 的随机数,并按该随机数对这些记录进行分组。

我们在分组上浪费时间,但对我来说更快,因为现在我可以使用更多的 reducer。

这是代码(SUBMITTER是我自己的UDF):

tmpRecord = FOREACH record GENERATE (int)(RANDOM()*100.0) as rnd, data;
groupTmpRecord = GROUP tmpRecord BY rnd;
result = FOREACH groupTmpRecord GENERATE FLATTEN(SUBMITTER(tmpRecord));
于 2014-06-19T23:17:51.433 回答
1

我认为数据存在偏差。只有少数映射器产生指数级的大输出。查看数据中键的分布。类似数据包含少数具有大量记录的组。

于 2014-06-17T07:28:45.563 回答
0

要回答您的问题,我们必须首先知道 pig 强制执行多少减速器来完成 - Global Rearrange 过程。因为根据我的理解,生成/投影不应该需要一个减速器。关于Flatten,我不能说同样的话。然而,我们从常识中知道,在展平期间,目标是从袋子中去嵌套元组,反之亦然。为此,属于一个包的所有元组绝对应该在同一个减速器中可用。我可能错了。但是任何人都可以在这里添加一些东西来让这个用户得到答案吗?

于 2016-10-08T01:15:11.293 回答