使用 aCOGROUP
来组织具有相同键的记录,但避免JOIN
' 的不良叉积。然后FILTER
根据包含 的记录的包是否b
为空,拆分回两个关系,然后执行UNION
:
a = load '$input' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
b = load '$data' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
c = COGROUP a BY iid, b BY iid;
c_filt = FILTER c BY NOT IsEmpty(b);
a_new = FOREACH c_filt GENERATE group AS iid, FLATTEN(a);
b_new = FOREACH c_filt GENERATE group AS iid, FLATTEN(b);
out = UNION ONSCHEMA a_new, b_new;
singled = DISTINCT out;
STORE (ORDER singled BY iid) INTO '$output';
但是,我不喜欢这个解决方案——对于这样一个简单的操作来说,它有太多的行和新的关系。真正需要的是一种将两个袋子合二为一的方法。Pig 显然没有提供这个(如果有,请回答这个 SO question)。不过,您可以编写一个简单的 UDF 来执行此操作:
public class MERGE extends EvalFunc<DataBag> {
public DataBag exec(Tuple input) throws IOException {
DataBag b = new DefaultDataBag();
try {
if (input != null)
for (int i = 0; i < input.size(); i++)
b.addAll((DataBag) input.get(i));
} catch (Exception e) { return null; }
return b;
}
}
有了这个 UDF,解决方案就变成了:
a = load '$input' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
b = load '$data' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
c = FOREACH (COGROUP a BY iid, b BY iid) GENERATE group AS iid, MERGE(a,b) AS bag;
out = FOREACH c {
uniq = DISTINCT bag;
GENERATE iid, FLATTEN(bag);
};
STORE (ORDER out BY iid) INTO '$output';
这种方法的另一个优点是,如果您有多个输入,则不需要FOREACH
在COGROUP
. 只需添加更多参数MERGE
:
c = FOREACH (COGROUP a BY iid, b BY iid, ..., z BY iid)
GENERATE group AS iid, MERGE(a,b,...,z) AS bag;