我正在尝试编写一个 Java UDF,它将使用 Java UDF 对包中的元组进行排名。元组具有作为排名标准的值列和初始设置为 0 的排名列。元组基于值列进行排序。所有的元组都放在一个袋子里,而那个袋子又放在一个传递给 UDF 的新元组中。
然而,UDF 正在修改排名列 - 一旦方法退出,值都再次变为 0。我不确定如何将值设置为“Stick”。
任何帮助将不胜感激。
这是我的java类
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.logicalLayer.FrontendException;
import java.util.Iterator;
import org.apache.pig.PigWarning;
/**
*
* @author Winter
*/
public class Ranker extends EvalFunc<String>{
@Override
public String exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
return null;
}
List<Object> list = tuple.getAll();
DataBag db = (DataBag) list.get(0);
Integer num = (Integer)list.get(1);
Iterator<Tuple>itr = db.iterator();
boolean containsNonNull = false;
int i = 1;
double previous=0;
while (itr.hasNext()) {
Tuple t= itr.next();
double d = (Double)t.get(num.intValue());
int rankCol = t.size()-1;
Integer rankVal = (Integer)t.get(rankCol);
if(i == 0){
System.out.println("i==0");
previous = d;
t.set(rankCol, i);
} else {
if(d == previous)
t.set(rankCol, i);
else{
System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal);
t.set(rankCol, ++i);
rankVal = (Integer)t.get(rankCol);
System.out.println("|now rank val" + rankVal);
previous = d;
}
}
}
return "Y";
}
}
这就是我如何称呼 Pig 中的所有内容 -
REGISTER /myJar.jar;
A = LOAD '/Users/Winter/milk-tea-coffee.tsv' as (year:chararray, milk:double);
B = foreach A generate year, milk, 0 as rank;
C = order B by milk asc;
D = group C by rank order C by milk;
E = foreach D generate D.C.year,D.C.milk,D.C.rank, piglet3.evalFunctions.Ranker(D.C,1);
dump E;
由于 UDF 中的打印语句,我可以告诉它在 UDF 中的工作 - d!==previous|21.2|0.0|0|now rank val2 d!==previous|21.6|21.2|0|now rank val3 d!= =previous|21.9|21.6|0|now rank val4 d!==previous|22.0|21.9|0|now rank val5 d!==previous|22.5|22.0|0|now rank val6 d!==previous|22.9| 22.5|0|now rank val7 d!==previous|23.0|22.9|0|now rank val8 d!==previous|23.4|23.0|0|now rank val9 d!==previous|23.8|23.4|0|now rank val10 d!==previous|23.9|23.8|0|now rank val11
但是当我转储 E 或 D 或 C 时,排名列仅包含 0。