1

我有一些数据由 0 到 200 几百万范围内的 id 键入,我需要将其拆分为 0-5mil、5mil-10mil 等范围内的美元。

我正在尝试在最后一部分使用 Hadoop 上的自定义分区器,以便我的代码的最后一部分看起来像这样:

Conns = FOREACH ConnsGrouped GENERATE group as memberId, $1.companyId as companyIds;
ConnsPartitioned = DISTINCT Conns PARTITION BY com.mypackage.SearchNodePartitioner PARALLEL 50;

rmf $connections_file

Store ConnsPartitioned INTO 'test' using AvroStorage(...);

我的分区器如下所示:

public class SearchNodePartitioner<Long, V> implements Partitioner<Long, V>
{
    @Override
    public void configure(JobConf conf) 
    {
        // Nothing
    }

    @Override
    public int getPartition(Long key, V value, int numPartitions) 
    {
       return new Double(Math.floor(key / (5.0 * Math.pow(10, 6)))).intValue() % numPartitions;
    }

}

但它似乎根本没有被调用。即使我用return 1;跨文件的数据替换返回行似乎是散列分布的默认行为。

4

2 回答 2

1

DISTINCT + 自定义分区器的答案是:你不能再这样做了(正如我刚刚发现的那样)。DISTINCT 现在使用优化的特殊分区器。

看:

http://mail-archives.apache.org/mod_mbox/pig-user/201307.mbox/%3C14FE3AC3-DBA5-4898-AF94-0C34819A0D8B%40hortonworks.com%3E

https://issues.apache.org/jira/browse/PIG-3385

一种解决方法:

A = //一些元组...;

B = GROUP A BY 字段 PARTITION BY 自定义;

使用 .... 将 B 存储到“foo”中;

之后:

B = 加载 'foo' 使用 ...;

A = FOREACH B 生成展平($1);

于 2013-07-19T13:28:18.373 回答
0

您可以这样做的一种方法是:

A = LOAD ............
SPLIT A INTO B IF <your range condition> , C IF < your range condition>
STORE B ...
STORE C ...

或者你可以试试这个:

 B = FILTER A BY $1 >= <lower_Range> AND $1 <= <upper_Range>;

此外,由于您已经编写了自定义分区器,因此使用 MapReduce 很容易实现这一点。

您的 Map Class 只会发出对,而您的自定义分区器会将适当的值范围发送到给定的 reducer。但是,我不确定在对输入数据进行分区后您到底想做什么,所以我无法评论减速器必须做什么。

您可以在 Main 方法中将自定义分区器类设置为:

Job.setPartitionerClass(<your custom partitioner class>);
于 2013-07-09T22:55:47.840 回答