3

我有一些缓慢变化的元数据,它们实时存储在 HDFS 上。我想写一个猪作业,将这些行压缩到每个键的最新行。

例如,对于这些数据(为清楚起见添加了列标题):

ts   meta   key
--   ----   ---
1    foo    id1
2    que    id2
3    que    id2
4    foo    id1
5    pasa   id2
6    pasa   id2
7    foo    id1
8    pasa   id2
9    pasa   id2
10   pasa   id2
11   pasa   id2
12   hombre id2
13   foo    id1
14   foo    id1
15   hombre id2
16   bar    id1
17   bar    id1
18   bar    id1
19   bar    id1
20   bar    id1

我希望得到输出:

15   hombre id2
20   bar    id1

我刚刚开始学习 Pig Latin 的来龙去脉——在 pig 或某个库中是否有内置的方法可以做到这一点,或者我应该看看写一个 UDF?

4

2 回答 2

6

这是嵌套 foreach 的好地方:

A = LOAD '$input' AS (ts:int, meta:chararray, key:chararray);
B =
    FOREACH (GROUP A BY key) {
        byts = ORDER A BY ts DESC;
        newest = LIMIT byts 1;
        GENERATE FLATTEN(newest);
    };

测试您的数据(我将其转换为制表符分隔):

(20,bar,id1)
(15,hombre,id2)

使用这种方法,即使您有两行带有最新时间戳的行,也可以保证每个键只有一行(尽管这对您的数据可能无关紧要)。

于 2013-08-26T21:13:56.753 回答
1

您可以这样做的一种方法是:

A = LOAD 'myinput' USING PigStorage(' ') 
    AS (ts:int, meta:chararray, key:chararray) ;

-- Group by key, then find the newest ts for each key
B = FOREACH (GROUP A BY key)
    GENERATE MAX(A.ts) AS newest,
             FLATTEN(A) ;

-- Now that each line has the newest (appropriate) ts, 
-- we can use it to filter the input
C = FILTER B BY newest == ts ;

输出:

(20,20,bar,id1)
(15,15,hombre,id2)

请注意,如果您不想拥有副本ts,则可以现在将其投影出来,如下所示:

C = FOREACH (FILTER B BY newest == ts)
    GENERATE A::ts AS ts, A::meta AS meta, A::key AS key ;

但最好现在就不要管多余ts的部分,然后将其作为以后的一部分进行计划,FOREACH以限制工作的数量。

更新:ts此方法将返回每个键最大的所有行。例如,如果数据是这样的:

(11,nope,id1)
(20,foo,id1)
(20,bar,id1)

然后它将同时返回foobar

于 2013-08-26T19:14:06.777 回答