也许人们可以通过一种棘手的方式仅使用内置函数来获得结果,但一个简单的 UDF 也可以完成这项工作:
public class SlidingTuple extends EvalFunc<DataBag> {
private static final BagFactory bagFactory = BagFactory.getInstance();
private static final TupleFactory tupleFactory = TupleFactory.getInstance();
@Override
public DataBag exec(Tuple input) throws IOException {
try {
DataBag inputBag = (DataBag) input.get(0);
DataBag result = null;
if (inputBag != null) {
result = bagFactory.newDefaultBag();
Iterator<Tuple> it = inputBag.iterator();
Tuple previous = it.next();
while (it.hasNext()) {
Tuple current = it.next();
Tuple tuple = tupleFactory.newTuple(2);
tuple.set(0, previous.get(0));
tuple.set(1, current.get(0));
result.add(tuple);
previous = current;
}
}
return result;
}
catch (Exception e) {
throw new RuntimeException("SlidingTuple error", e);
}
}
}
然后:
A = LOAD '/user/hive/warehouse/twitter_raw/$date' USING PigStorage('\t')
AS (id:chararray, mess:chararray);
B = foreach A generate TOKENIZE(mess, ' ') as words;
然后使用您的自定义 UDF:
C = foreach B generate com.example.SlidingTuple(words);