1

我有以下两个DataFrames

DataFrame "dfPromotion":
date        | store
===================
2017-01-01  | 1    
2017-01-02  | 1


DataFrame "dfOther":
date        | store
===================
2017-01-01  | 1    
2017-01-03  | 1    

后来我需要以上union两个DataFrames。但在我必须删除所有dfOther具有date值的行之前,它也包含在dfPromotion.

以下filtering步骤的结果应如下所示:

DataFrame "dfPromotion" (this stays always the same, must not be changed in this step!)
date        | store
===================
2017-01-01  | 1    
2017-01-02  | 1


DataFrame "dfOther" (first row is removed as dfPromotion contains the date 2017-01-01 in the "date" column)
date        | store
===================
2017-01-03  | 1 

有没有办法在 Java 中做到这一点?我只找到了这个DataFrame.except方法,但这会检查 DataFrames 的所有列。我需要仅按column过滤第二个 DataFramedate,因为稍后可以添加其他列,其中可能包含不同的值...

调用dfOther.filter(dfOther.col("date").isin(dfPromotion.col("date")))会引发以下异常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) date#64 missing from date#0,store#13 in operator !Filter date#0 IN (date#64);
4

2 回答 2

2

您可以使用减法功能,

dfOther.select("date").except(dfPromotion.select("date")).join(dfOther,'date').show()
于 2017-03-15T18:14:15.973 回答
1

既然你提到了 Spark Hive,你可以试试下面的 spark sql 方法吗?

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
val dfpromotion = sqlContext.sql("select * from dfpromotion");

dfpromotion.show
+----------+-----+
|        dt|store|
+----------+-----+
|2017-01-01|    1|
|2017-01-02|    1|
+----------+-----+

val dfother = sqlContext.sql("select * from dfother");

dfother.show
+----------+-----+
|        dt|store|
+----------+-----+
|2017-01-01|    1|
|2017-01-03|    1|
+----------+-----+


val dfdiff = sqlContext.sql("select o.dt, o.store from dfpromotion p right         outer join dfother o on p.dt = o.dt where p.dt is null");
val dfunion = dfpromotion.union(dfdiff);


scala> dfunion.show
+----------+-----+
|        dt|store|
+----------+-----+
|2017-01-01|    1|
|2017-01-02|    1|
|2017-01-03|    1|
于 2017-03-15T18:37:48.053 回答