apache PIG 中是否有类似于 SQL 中的 Lead/Lag 函数的函数?或者任何可以回顾前一行记录的猪功能?
3 回答
好的,这是我的第一枪。请注意,我今天才开始学习如何编写 UDF。
Maven 的 pom.xml 文件包含:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.0.0-cdh4.1.0</version>
</dependency>
...
Java UDF 类:
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class GenericLag extends EvalFunc<String>{
private String lagObject = null;
@Override
public String exec(Tuple input) throws IOException {
try {
String returnObject = getLagObject();
setLagObject(input.get(0).toString());
return returnObject;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public String getLagObject() {
return lagObject;
}
public void setLagObject(String lagObject) {
this.lagObject = lagObject;
}
}
最初,我使用Object
而不是String
你在上面看到的“字符串”,但我收到了这个错误:
ERROR org.apache.pig.tools.grunt.Grunt - ERROR 2080: Foreach currently does not handle type Unknown
我不得不发出setLagObject(input.get(0).toString());
,setLagObject(input.get(0);
否则我会收到如下错误:
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.String
java.lang.ClassCastException: org.joda.time.DateTime cannot be cast to java.lang.String
这是我在 Pig 中使用它的方法:
REGISTER /path/to/compiled/file.jar
DEFINE LAG fully.qualified.domain.name.GenericLag();
A = LOAD '/hdfs/path/to/directory' USING PigStorage(',') AS (
important_order_by_field:int
,second_important_order_by_field:string
,...
,string_field_to_lag:string
,int_field_to_lag:int
,date_field_to_lag:string
);
B = FOREACH A GENERATE
important_order_by_field
,second_important_order_by_field
,...
,string_field_to_lag
,int_field_to_lag
,ToDate(date_field_to_lag, 'yyyy-MM-dd HH:mm:ss')
;
C = ORDER A BY important_order_by_field, second_important_order_by_field
D = FOREACH B GENERATE
important_order_by_field
,second_important_order_by_field
,...
,LAG(string_field_to_lag) AS lag_string
,(int) LAG(int_field_to_lag) AS lag_int
,(date_field_to_lag IS NULL ?
null :
ToDate(SUBSTRING(REPLACE(LAG(date_field_to_lag), 'T', ' ') ,0,19), 'yyyy-MM-dd HH:mm:ss')) AS lag_date
;
DUMP D;
如果我这样做最后一行:
ToDate(SUBSTRING(REPLACE(LAG(date_field_to_lag), 'T', ' ') ,0,19), 'yyyy-MM-dd HH:mm:ss') AS lag_date
它将返回以下错误
ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1066: Unable to open iterator for alias LAGGED_RHODES. Backend error : null
检查日志时显示:
java.lang.NullPointerException
at org.joda.time.format.DateTimeFormatterBuilder$NumberFormatter.parseInto(DateTimeFormatterBuilder.java:1200)
因为第一行将包含一个空值。
这是一个替代方案:
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
public class GenericLag2 extends EvalFunc<Tuple>{
private List<String> lagObjects = null;
@Override
public Tuple exec(Tuple input) throws IOException {
if (lagObjects == null) {
lagObjects = new ArrayList<String>();
return null;
}
try {
Tuple output = TupleFactory.getInstance().newTuple(lagObjects.size());
for (int i = 0; i < lagObjects.size(); i++) {
output.set(i, lagObjects.get(i));
}
lagObjects.clear();
for (int i = 0; i < input.size(); i++) {
lagObjects.add(input.get(i).toString());
}
return output;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public Schema outputSchema(Schema input) {
Schema tupleSchema = new Schema();
try {
for (int i = 0; i < input.size(); i++) {
tupleSchema.add(new FieldSchema("lag_" + i, DataType.CHARARRAY));
}
return new Schema(new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
} catch (FrontendException e) {
e.printStackTrace();
return null;
}
}
}
我认为这会更快,但我不确定,因为您必须执行以下操作:
...
C = ORDER A BY important_order_by_field, second_important_order_by_field
D = FOREACH B GENERATE
important_order_by_field
,second_important_order_by_field
,...
,FLATTEN(LAG(
string_field_to_lag
,int_field_to_lag
,date_field_to_lag
))
;
E = FOREACH D GENERATE
important_order_by_field
,second_important_order_by_field
,...
,string_field_to_lag
,(int) int_field_to_lag
,(date_field_to_lag IS NULL ?
null :
ToDate(SUBSTRING(REPLACE(date_field_to_lag, 'T', ' '), 0, 19), 'yyyy-MM-dd HH:mm:ss'))
as date_field_to_lag
;
DUMP E;