使用 Spark SQL,我有两个数据框,它们是从一个创建的,例如:
df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]
我想过滤 df1,如果它的“路径”的一部分是 df2 中的任何路径。因此,如果 df1 具有路径为“a/b/c/d/e”的行,我会发现 df2 中的行是否为路径为“a/b/c”的行。在 SQL 中应该是这样的
SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)
其中 udf 是用户定义的函数,可缩短 df1 的原始路径。幼稚的解决方案是使用 JOIN 然后过滤结果,但速度很慢,因为 df1 和 df2 各有超过 1000 万行。
我也尝试了以下代码,但首先我必须从 df2 创建广播变量
static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext
sqlContext.createDataFrame(df1.javaRDD().filter(
new Function<Row, Boolean>(){
@Override
public Boolean call(Row row) throws Exception {
String foo = shortenPath(row.getString(0));
return bdf.value().filter("path = '"+foo+"'").count()>0;
}
}
), myClass.class)
我遇到的问题是,在评估返回/执行 df2 过滤时,Spark 卡住了。
我想知道如何使用两个数据框来做到这一点。我真的很想避免加入。有任何想法吗?
编辑>>
在我的原始代码中,df1 具有别名“第一”和 df2“第二”。此连接不是笛卡尔连接,也不使用广播。
df1 = df1.as("first");
df2 = df2.as("second");
df1.join(df2, df1.col("first.path").
lt(df2.col("second.path"))
, "left_outer").
filter("isPrefix(first.path, second.path)").
na().drop("any");
isPrefix 是 udf
UDF2 isPrefix = new UDF2<String, String, Boolean>() {
@Override
public Boolean call(String p, String s) throws Exception {
//return true if (p.length()+4==s.length()) and s.contains(p)
}};
缩短路径 - 它剪切路径中的最后两个字符
UDF1 shortenPath = new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
String[] foo = s.split("/");
String result = "";
for (int i = 0; i < foo.length-2; i++) {
result += foo[i];
if(i<foo.length-3) result+="/";
}
return result;
}
};
记录示例。路径是独一无二的。
a/a/a/b/c abc
a/a/a qwe
a/b/c/d/e abc
a/b/c qwe
a/b/b/k foo
a/b/f/a bar
...
所以 df1 由
a/a/a/b/c abc
a/b/c/d/e abc
...
和 df2 组成
a/a/a qwe
a/b/c qwe
...