3

我有时间戳样本,我正在使用 Pig 处理它们。我想每天找到样本的最小值和该最小值的时间。所以我需要选择包含最小值的样本的记录。

下面为简单起见,我将在两个字段中表示时间,第一个是日期,第二个是一天中的“时间”。

1,1,4.5
1,2,3.4
1,5,5.6

要找到最小值,请执行以下操作:

samples = LOAD 'testdata' USING PigStorage(',') AS (day:int, time:int, samp:float);
g = GROUP samples BY day;
dailyminima = FOREACH g GENERATE group as day, MIN(samples.samp) as samp;

但后来我失去了最小值发生的确切时间。我希望我可以使用嵌套表达式。我尝试了以下方法:

dailyminima = FOREACH g {
  minsample = MIN(samples.samp);
  mintuple = FILTER samples BY samp == minsample;
  GENERATE group as day, mintuple.time, mintuple.samp;
};

但是,我收到错误消息:

2012-11-12 12:08:40,458 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1000: 
<line 5, column 29> Invalid field reference. Referenced field [samp] does not exist in schema: .
Details at logfile: /home/hadoop/pig_1352722092997.log

如果我将 minsample 设置为常数,它不会抱怨:

dailyminima = FOREACH g {
  minsample = 3.4F;
  mintuple = FILTER samples BY samp == minsample;
  GENERATE group as day, mintuple.time, mintuple.samp;
};

并且确实产生了一个合理的结果:

(1,{(2)},{(3.4)})

在写这篇文章时,我想到了使用单独的 JOIN:

dailyminima = FOREACH g GENERATE group as day, MIN(samples.samp) as minsamp;
dailyminima = JOIN samples BY (day, samp), dailyminima BY (day, minsamp);

这行得通,但结果(在实际情况下)连接两个大型数据集,而不是搜索一天的值,这似乎并不健康。

在实际情况下,我实际上想找到最大和最小以及相关的时间。我希望嵌套表达式方法能让我同时做这两个。

解决方法的建议将不胜感激。

4

2 回答 2

3

感谢alexeipab 提供另一个SO question的链接。

一种可行的解决方案(查找最小值和最大值以及相关时间)是:

dailyminima = FOREACH g {
  minsamples = ORDER samples BY samp;
  minsample = LIMIT minsamples 1;
  maxsamples = ORDER samples BY samp DESC;
  maxsample = LIMIT maxsamples 1;
  GENERATE group as day, FLATTEN(minsample), FLATTEN(maxsample);
};
于 2012-11-12T16:24:21.427 回答
0

另一种方法是使用 PiggyBank ExtremalTupleByNthField,它的优点是它不对整个关系进行排序,并且只将(潜在的)最小值保留在内存中。这个 UDF 实现了累加器和代数,并且非常有效。

您的代码将如下所示:

DEFINE TupleByNthField org.apache.pig.piggybank.evaluation.ExtremalTupleByNthField('3', 'min');

samples = LOAD 'testdata' USING PigStorage(',') AS (day:int, time:int, samp:float);

g = GROUP samples BY day;   

bagged = FOREACH g GENERATE TupleByNthField(samples);

flattened = FOREACH bagged GENERATE FLATTEN($0);

min_result = FOREACH flattened GENERATE $1 .. ;

请记住,我们基于samp字段进行排序的事实是在 DEFINE 语句中通过传递3作为第一个参数来定义的。

于 2016-01-24T13:18:35.547 回答