0

我让 Pig (0.10) 将数据从 hdfs 加载到 hbase 中。原始记录没有唯一的行键,所以我有一个 UDF 构造:

public class Foo extends EvalFunc<Tuple> {
    // FIXME: If there are multiple map jobs for the same batch,
    // they will reuse the serial numbers.
    // Need to add something to figure out a distinct per task #
    private int task_id=0;
    private long serial=0L;

    public Tuple exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0)
            return null;
        try {
            Integer batch_id=(Integer)input.get(0);
            String rowkey=String.format("%7d%3d%9d", batch_id, task_id, serial++);
            // ... compute other values for the return Tuple.
        }
    }
}

我的理解是,如果 pig 为同一个输入数据集启动两个不同的映射作业(由于超出块大小或从目录加载时具有多个输入文件),每个都将是一个单独的 Java 实例,因此Foo.serial 会有多个独立的副本;我的 rowkeys 不会是唯一的,我将覆盖我试图加载到 HBase 中的许多记录。

如果我的 UDF 可以确定它属于哪个映射器,那么冲突就会消失。我可以回退到 IP 地址 + 进程 ID,但这相当浪费。

4

1 回答 1

0

查看DataFu集合中的 Enumerate UDF 。这将取一个袋子并为每个元素分配一个数字 1 到 N,其中 N 是袋子的大小。不幸的副作用是我相信你的所有数据都必须通过一个 reducer。但从你的描述看来,这可能不是什么大问题。(听起来数据只是有时大到需要在多个映射器中拆分。)

您可以使用 将所有数据简单地分组到一个包中GROUP ... ALL,然后枚举该包。然后,您可以使用此编号构造一个自定义行键,该编号对于包中的每条记录都是唯一的。

于 2013-04-01T21:11:16.510 回答