0

spark As 可以是列子查询吗?

原因:java.lang.RuntimeException:在 [storeid#4,combox_pid#6,pid#7,count(1)#61L] 中找不到 count(DISTINCT orderid)#69L

select b.pid, (select count(distinct orderid) from a where a.pid=b.pid) as order_num,count(1) from b as b group by b.pid

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}

object OrderCountTset {
  Logger.getRootLogger.setLevel(Level.WARN)
  def main(args: Array[String]): Unit = {
    val data = Seq(
      Row("a", "100", "200", "300"),
      Row("a", "100", "200", "300"),
      Row("a", "101", "201", "300"),
      Row("a", "101", "200", "3001"),
      Row("a", "102", "200", "300"),
      Row("a", "103", "201", "300")
    )
    val schme = new StructType()
      .add("storeid", StringType)
      .add("orderid", StringType)
      .add("combox_pid", StringType)
      .add("pid", StringType)

    val spark =   SparkSession.builder().master("local[*]").getOrCreate()
    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schme)
    df.show()
    df.createOrReplaceTempView("tab_tmp")
    spark.sql("select storeid,orderid,combox_pid,pid,count(distinct orderid ),count(1) from tab_tmp group by storeid,orderid,combox_pid,pid").show()
    //    spark.sql(
    //      """
    //        |select b.storeid,b.combox_pid,b.pid,
    //        |(select count(distinct a.orderid) from tab_tmp as a where a.pid=b.pid) as order_num,
    //        |count(1)
    //        | from tab_tmp as b group by b.storeid,b.combox_pid,b.pid
    //      """.stripMargin).show()
    spark.sql(
      """
        |select storeid,combox_pid,pid,count(1) as num
        | from tab_tmp group by storeid,combox_pid,pid
      """.stripMargin).createOrReplaceTempView("tab_tmp_2")
    spark.sql(
      """
        |select b.storeid,b.combox_pid,b.pid,num,
        |(select count(distinct a.orderid) from tab_tmp as a where a.pid=b.pid) as order_num
        | from tab_tmp_2 as b
      """.stripMargin).show()
  }

}```
4

1 回答 1

0

你有 SQL Query 的问题,而不是 Spark 的 sub Query。用下面的 Spark Sql 替换你的最后一个查询。

如果您的情况distinct orderid不考虑,请pid使用以下内容:

spark.sql(
     |       """
     |         |select b.storeid,b.combox_pid,b.pid,num,
     |         |(select count(distinct a.orderid) from tab_tmp as a join tab_tmp_2 as b on a.pid=b.pid) as order_num
     |         | from tab_tmp_2 as b
     |       """.stripMargin).show()
+-------+----------+----+---+---------+
|storeid|combox_pid| pid|num|order_num|
+-------+----------+----+---+---------+
|      a|       200| 300|  3|        4|
|      a|       201| 300|  2|        4|
|      a|       200|3001|  1|        4|
+-------+----------+----+---+---------+

或者,如果您的情况是得到distinct orderidperpid然后使用下面的 Query

scala> spark.sql("select a.pid, count(distinct a.orderid) as order_num from tab_tmp as a join tab_tmp_2 as b on a.pid=b.pid group by a.pid").createOrReplaceTempView("tab_tmp_3")

scala> spark.sql(
     |       """
     |         |select b.storeid,b.combox_pid,b.pid, c.order_num
     |         | from tab_tmp_2 as b left join tab_tmp_3 c on b.pid = c.pid
     |       """.stripMargin).show()
+-------+----------+----+---------+
|storeid|combox_pid| pid|order_num|
+-------+----------+----+---------+
|      a|       200| 300|        4|
|      a|       201| 300|        4|
|      a|       200|3001|        1|
+-------+----------+----+---------+
于 2020-02-10T06:22:53.877 回答