达到预期结果的一种方法:
IN = load 'data.txt' using PigStorage(',') as (name:chararray, type:chararray,
date:int, region:chararray, op:chararray, value:int);
A = order IN by op asc;
B = group A by (name, type, date, region);
C = foreach B {
bs = STRSPLIT(BagToString(A.value, ','),',',3);
generate flatten(group) as (name, type, date, region),
bs.$2 as OpX:chararray, bs.$0 as OpC:chararray, bs.$1 as OpT:chararray;
}
describe C;
C: {name: chararray,type: chararray,date: int,region: chararray,OpX:
chararray,OpC: chararray,OpT: chararray}
dump C;
(john,ab,20130106,D,20,19,8)
(john,ab,20130106,E,98,854,67)
更新:
如果您想跳过向计算order by
添加额外减少阶段的步骤,您可以在每个值前面加上元组 v中对应的操作。然后使用自定义 UDF对元组字段进行排序,以获得所需的 OpX、OpC、OpT 顺序:
register 'myjar.jar';
A = load 'data.txt' using PigStorage(',') as (name:chararray, type:chararray,
date:int, region:chararray, op:chararray, value:int);
B = group A by (name, type, date, region);
C = foreach B {
v = foreach A generate CONCAT(op, (chararray)value);
bs = STRSPLIT(BagToString(v, ','),',',3);
generate flatten(group) as (name, type, date, region),
flatten(TupleArrange(bs)) as (OpX:chararray, OpC:chararray, OpT:chararray);
}
mjar.jar 在哪里TupleArrange
是这样的:
..
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class TupleArrange extends EvalFunc<Tuple> {
private static final TupleFactory tupleFactory = TupleFactory.getInstance();
@Override
public Tuple exec(Tuple input) throws IOException {
try {
Tuple result = tupleFactory.newTuple(3);
Tuple inputTuple = (Tuple) input.get(0);
String[] tupleArr = new String[] {
(String) inputTuple.get(0),
(String) inputTuple.get(1),
(String) inputTuple.get(2)
};
Arrays.sort(tupleArr); //ascending
result.set(0, tupleArr[2].substring(1));
result.set(1, tupleArr[0].substring(1));
result.set(2, tupleArr[1].substring(1));
return result;
}
catch (Exception e) {
throw new RuntimeException("TupleArrange error", e);
}
}
@Override
public Schema outputSchema(Schema input) {
return input;
}
}