正如 SNeumann 指出的那样,您可以使用 DataFu 的 SetIntersect 作为示例。
鉴于这些文件,以您的示例为基础:
1,word1 word4 word2 word1
2,word2 word6 word1 word5 word3 word7
3,word1 word3 word4 word5
并给出这个查询:
word2 word7 word5
然后这段代码给你你想要的:
define SetIntersect datafu.pig.sets.SetIntersect();
docs = LOAD 'docs' USING PigStorage(',') AS (id:int, line:chararray);
B = FOREACH docs GENERATE id, line;
C = FOREACH B GENERATE id, TOKENIZE(line) as gu;
filtered = FOREACH C {
uniq = DISTINCT gu;
GENERATE id, uniq;
}
query = LOAD 'query' AS (line_query:chararray);
bag_query = FOREACH query GENERATE TOKENIZE(line_query) AS query;
-- sort the bag of tokens, since SetIntersect requires it
bag_query = FOREACH bag_query {
query_sorted = ORDER query BY token;
GENERATE query_sorted;
}
result = FOREACH filtered {
-- sort the tokens, since SetIntersect requires it
tokens_sorted = ORDER uniq BY token;
GENERATE id,
SIZE(SetIntersect(tokens_sorted,bag_query.query_sorted)) as cnt;
}
DUMP result;
结果值:
(1,1)
(2,3)
(3,1)
这是一个完整的工作示例,您可以将其粘贴到位于此处的 SetIntersect 的 DataFu 单元测试中:
/**
register $JAR_PATH
define SetIntersect datafu.pig.sets.SetIntersect();
docs = LOAD 'docs' USING PigStorage(',') AS (id:int, line:chararray);
B = FOREACH docs GENERATE id, line;
C = FOREACH B GENERATE id, TOKENIZE(line) as gu;
filtered = FOREACH C {
uniq = DISTINCT gu;
GENERATE id, uniq;
}
query = LOAD 'query' AS (line_query:chararray);
bag_query = FOREACH query GENERATE TOKENIZE(line_query) AS query;
-- sort the bag of tokens, since SetIntersect requires it
bag_query = FOREACH bag_query {
query_sorted = ORDER query BY token;
GENERATE query_sorted;
}
result = FOREACH filtered {
-- sort the tokens, since SetIntersect requires it
tokens_sorted = ORDER uniq BY token;
GENERATE id,
SIZE(SetIntersect(tokens_sorted,bag_query.query_sorted)) as cnt;
}
DUMP result;
*/
@Multiline
private String setIntersectTestExample;
@Test
public void setIntersectTestExample() throws Exception
{
PigTest test = createPigTestFromString(setIntersectTestExample);
writeLinesToFile("docs",
"1,word1 word4 word2 word1",
"2,word2 word6 word1 word5 word3 word7",
"3,word1 word3 word4 word5");
writeLinesToFile("query",
"word2 word7 word5");
test.runScript();
super.getLinesForAlias(test, "filtered");
super.getLinesForAlias(test, "query");
super.getLinesForAlias(test, "result");
}
如果您有任何其他类似的用例,我很乐意听到它们 :) 我们一直在寻求为 DataFu 贡献更多有用的 UDF。