我正在使用三个 SchemaRDD 进行三向连接(每个记录大约为一百万条记录,存储在 HDFS 上的 Parquet 文件中)。
架构如下:
- table1 有四个字段:id、group_id、t2_id 和日期
- table2 有三个字段:id、group_id 和 t3_id
- table3 有三个字段:id、group_id 和 date
我试图找出组内 table1 和 table3 之间的关系。
我将使用的 SQL 查询是:
SELECT group_id, t1.id, t3.id
FROM table1, table2, table3
WHERE t1.group_id = t2.group_id and t1.t2_id = t2.id and
and t2.group_id = t3.group_id and t2.t3_id = t3.id and
t3.date < t1.date
但是,我正在尝试在 Spark 中执行此操作:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
val tab1 = sqlContext.parquetFile("warehouse/tab1_pq")
val tab2 = sqlContext.parquetFile("warehouse/tab2_pq")
val tab3 = sqlContext.parquetFile("warehouse/tab3_pq")
val relationship = tab1.as('t1).
join(tab2.as('t2), Inner, Some(("t2.group_id".attr === "t1.group_id".attr) && ("t2.id".attr === "t1.t2_id".attr))).
join(tab3.as('t3), Inner, Some(("t3.group_id".attr === "t2.group_id".attr) && ("t3.id".attr === "t2.t3_id".attr))).
where("t3.date".attr <= "t1.date".attr).
select("t1.group_id".attr, "t1.id".attr, "t3.id".attr)
所以这似乎可行——但它在同一个(3 单元,EMR)集群上的运行速度明显慢于 impala。这是正确的方法吗?有没有办法让它更高效?
谢谢你的帮助