0

我们将 pig-0.11.0-cdh4.3.0 与 CDH4 集群一起使用,我们需要对一些 Web 日志进行重复数据删除。解决方案的想法(用 SQL 表示)是这样的:

SELECT
     T1.browser,
     T1.click_type,
     T1.referrer,
     T1.datetime,
     T2.datetime
FROM
     My_Table T1
INNER JOIN My_Table T2 ON
     T2.browser = T1.browser AND
     T2.click_type = T1.click_type AND
     T2.referrrer = T1.referrer AND
     T2.datetime > T1.datetime AND
     T2.datetime <= DATEADD(mi, 1, T1.datetime)

我从这里抓取了上面的SQL 发现重复记录在 1 分钟内发生。我希望我可以在 Pig 中实现类似的解决方案,但我发现 Pig 显然不支持通过上述连接所需的表达式(仅按字段)连接。你知道如何用 Pig 去重复 1 分钟以内的事件吗?谢谢!

4

4 回答 4

0

Aleks and Marq ,

  records_group = group records by (browser, click_type, referrer);

  with_min = FOREACH records_group 
           GENERATE FLATTEN(records), MAX(records.datetime) as max 

  with_min = FOREACH with_min GENERATE browser, click_type, referrer, 
            ABS(max - dateime) as maxDtgroup;

  regroup = group with_min by (browser, click_type, referrer, maxDtgroup);

Re-group with maxDtGroup is the key and filter the top 1 record.

于 2013-07-16T16:30:18.463 回答
0

在我看来,这样的事情可能会起作用,但需要测试:

view = FOREACH input GENERATE browser, click_type, referrer, datetime, GetYear(datetime) as year, GetMonth(datetime) as month, GetDay(datetime) as day, GetHour(datetime) as hour, GetMinute(datetime) as minute;
grp = GROUP view BY (browser, click_type, referrer, year, month, day, hour, minute);
uniq = FOREACH grp {
    top = LIMIT view 1;
    GENERATE FLATTEN(view.(browser, click_type, referrer, datetime))
}

如果一个事件发生在 12:03:45 和另一个事件发生在 12:03:59,这是有原因的,这些事件将在同一组中,而 12:04:45 和 12:05:00 将在不同的组中。

要获得确切的 60 秒差异,您需要编写一个 UDF,该 UDF 将遍历按(浏览器、click_type、referrer)分组的排序包并删除不需要的行。

于 2013-07-16T14:44:42.527 回答
0

一种方法是你可以通过所需的参数来做这个组

         top3 = foreach grpd {
            sorted = filter records by time < 60;
            top    = limit sorted 2;
            generate group, flatten(top);
         };
于 2013-07-16T14:45:25.713 回答
0

这将是另一种方法

   records_group = group records by (browser, click_type, referrer);

   with_min = FOREACH records_group 
   GENERATE
   FLATTEN(records), MAX(records.datetime) as maxDt ;

  filterRecords = filter with_min by (maxDt - $2 ) <60;

$2 是数据时间位置相应地改变它

于 2013-07-16T14:57:49.337 回答