3

使用 Spark SQL,我有两个数据框,它们是从一个创建的,例如:

df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]

我想过滤 df1,如果它的“路径”的一部分是 df2 中的任何路径。因此,如果 df1 具有路径为“a/b/c/d/e”的行,我会发现 df2 中的行是否为路径为“a/b/c”的行。在 SQL 中应该是这样的

SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)

其中 udf 是用户定义的函数,可缩短 df1 的原始路径。幼稚的解决方案是使用 JOIN 然后过滤结果,但速度很慢,因为 df1 和 df2 各有超过 1000 万行。

我也尝试了以下代码,但首先我必须从 df2 创建广播变量

static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext 

sqlContext.createDataFrame(df1.javaRDD().filter(
         new Function<Row, Boolean>(){
             @Override
             public Boolean call(Row row) throws Exception {
                 String foo = shortenPath(row.getString(0));
                 return bdf.value().filter("path = '"+foo+"'").count()>0;
             }
          }
    ), myClass.class)

我遇到的问题是,在评估返回/执行 df2 过滤时,Spark 卡住了。

我想知道如何使用两个数据框来做到这一点。我真的很想避免加入。有任何想法吗?


编辑>>

在我的原始代码中,df1 具有别名“第一”和 df2“第二”。此连接不是笛卡尔连接,也不使用广播。

df1 = df1.as("first");
df2 = df2.as("second");

    df1.join(df2, df1.col("first.path").
                                lt(df2.col("second.path"))
                                      , "left_outer").
                    filter("isPrefix(first.path, second.path)").
                    na().drop("any");

isPrefix 是 udf

UDF2 isPrefix = new UDF2<String, String, Boolean>() {
        @Override
        public Boolean call(String p, String s) throws Exception {
            //return true if (p.length()+4==s.length()) and s.contains(p)
        }};

缩短路径 - 它剪切路径中的最后两个字符

UDF1 shortenPath = new UDF1<String, String>() {
        @Override
        public String call(String s) throws Exception {
            String[] foo = s.split("/");
            String result = "";
            for (int i = 0; i < foo.length-2; i++) {
                result += foo[i];
                if(i<foo.length-3) result+="/";
            }
            return result;
        }
    };

记录示例。路径是独一无二的。

a/a/a/b/c abc
a/a/a     qwe
a/b/c/d/e abc
a/b/c     qwe
a/b/b/k   foo
a/b/f/a   bar
...

所以 df1 由

a/a/a/b/c abc
a/b/c/d/e abc
...

和 df2 组成

a/a/a     qwe
a/b/c     qwe
...
4

2 回答 2

1

作为IN使用子查询实现的一种可能方式,LEFT SEMI JOIN可以使用:

    JavaSparkContext javaSparkContext = new JavaSparkContext("local", "testApp");
    SQLContext sqlContext = new SQLContext(javaSparkContext);
    StructType schema = DataTypes.createStructType(new StructField[]{
            DataTypes.createStructField("path", DataTypes.StringType, false),
            DataTypes.createStructField("value", DataTypes.StringType, false)
    });
    // Prepare First DataFrame
    List<Row> dataForFirstDF = new ArrayList<>();
    dataForFirstDF.add(RowFactory.create("a/a/a/b/c", "abc"));
    dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc"));
    dataForFirstDF.add(RowFactory.create("x/y/z", "xyz"));
    DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema);
    // 
    df1.show();
    //
    // +---------+-----+
    // |     path|value|
    // +---------+-----+
    // |a/a/a/b/c|  abc|
    // |a/b/c/d/e|  abc|
    // |    x/y/z|  xyz|
    // +---------+-----+

    // Prepare Second DataFrame
    List<Row> dataForSecondDF = new ArrayList<>();
    dataForSecondDF.add(RowFactory.create("a/a/a", "qwe"));
    dataForSecondDF.add(RowFactory.create("a/b/c", "qwe"));
    DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema);

    // Use left semi join to filter out df1 based on path in df2
    Column pathContains = functions.column("firstDF.path").contains(functions.column("secondDF.path"));
    DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi");

    //
    result.show();
    //
    // +---------+-----+
    // |     path|value|
    // +---------+-----+
    // |a/a/a/b/c|  abc|
    // |a/b/c/d/e|  abc|
    // +---------+-----+

此类查询的物理计划将如下所示:

== Physical Plan ==
Limit 21
 ConvertToSafe
  LeftSemiJoinBNL Some(Contains(path#0, path#2))
   ConvertToUnsafe
    Scan PhysicalRDD[path#0,value#1]
   TungstenProject [path#2]
    Scan PhysicalRDD[path#2,value#3]

它将使用 LeftSemiJoinBNL 进行实际的连接操作,该操作应在内部广播值。从更多细节参考 Spark 中的实际实现 - LeftSemiJoinBNL.scala

PS 我不太明白删除最后两个字符的必要性,但如果需要 - 可以这样做,就像@zero323 建议的那样(使用regexp_extract)。

于 2015-12-17T11:10:34.600 回答
1

您的代码至少有几个问题:

  • 您不能在另一个操作或转换中执行操作或转换。这意味着过滤广播DataFrame根本​​不起作用,你应该得到一个例外。
  • join您使用的是作为笛卡尔积执行,然后是过滤器。由于 SparkHashing用于连接,因此只有基于相等的连接才能在没有笛卡尔坐标的情况下有效执行。它与为什么在 SQL 查询中使用 UDF 会导致笛卡尔积略有相关?
  • 如果两者DataFrames都相对较大并且具有相似的大小,则广播不太可能有用。看看为什么我的 BroadcastHashJoin 比 Spark 中的 ShuffledHashJoin 慢
  • 在性能方面并不重要,但isPrefix似乎是错误的。特别是它看起来可以同时匹配前缀和后缀
  • col("first.path").lt(col("second.path"))条件看起来不对。我假设你想要a/a/a/b/cfromdf1匹配a/a/afrom df2。如果是这样,它应该gt不是lt

可能你能做的最好的事情是类似于这样的:

import org.apache.spark.sql.functions.{col, regexp_extract}

val df = sc.parallelize(Seq(
    ("a/a/a/b/c", "abc"), ("a/a/a","qwe"),
    ("a/b/c/d/e", "abc"), ("a/b/c", "qwe"),
    ("a/b/b/k", "foo"), ("a/b/f/a", "bar")
)).toDF("path", "value")

val df1 = df
    .where(col("value") === "abc")    
    .withColumn("path_short", regexp_extract(col("path"), "^(.*)(/.){2}$", 1))
    .as("df1")

val df2 = df.where(col("value") === "qwe").as("df2")
val joined = df1.join(df2, col("df1.path_short") === col("df2.path"))

您可以尝试像这样广播其中一张表(仅限 Spark >= 1.5.0):

import org.apache.spark.sql.functions.broadcast

df1.join(broadcast(df2), col("df1.path_short") === col("df2.path"))

并增加自动广播限制,但正如我上面提到的那样,它很可能比 plain 效率低HashJoin

于 2015-12-16T20:13:14.320 回答