0

“必须聚合相关标量子查询”是什么意思?,SparkSQL 抱怨标量子查询。

因此,如果仅通过查看 SQL 语句(不查看您的数据),催化剂不能 100% 确定子查询仅返回单行,则会引发此异常。如果您确定您的子查询只提供一行,您可以使用以下聚合标准函数之一,因此 Spark Analyzer 很高兴:

  • 第一的
  • 平均
  • 最大限度
  • 分钟

如何使下面的 SQL 查询工作?

SELECT
    prerequisite AS prerequisite,
    (SELECT e.description FROM course e WHERE e.course_no = c.prerequisite) as course_name,
    COUNT(*) as cnt
FROM
    course c
WHERE 
    c.prerequisite IS NOT NULL
GROUP BY 
    c.prerequisite
ORDER BY
    prerequisite;

它在 Oracle 中工作并返回结果。

PREREQUISITE COURSE_NAME                                               CNT
------------ -------------------------------------------------- ----------
          10 Technology Concepts                                         1
          20 Intro to Information Systems                                5
          25 Intro to Programming                                        2
          80 Programming Techniques                                      2
         120 Intro to Java Programming                                   1
         122 Intermediate Java Programming                               2
         125 Java Developer I                                            1
         130 Intro to Unix                                               2
         132 Basics of Unix Admin                                        1
         134 Advanced Unix Admin                                         1
         140 Systems Analysis                                            1
         204 Intro to SQL                                                1
         220 PL/SQL Programming                                          1
         310 Operating Systems                                           2
         350 Java Developer II                                           2
         420 Database System Principles                                  1

但是它在 SparkSQL 中失败并出现已知错误:

AnalysisException:必须聚合相关的标量子查询

如果我输入FIRSTor MAX,它会引发另一个异常Couldn't find first(description),这似乎是因为标量查询仅返回 1 行,因此无法从中找到第一行。

SELECT
    prerequisite AS prerequisite,
    (SELECT FIRST(e.description) FROM course e WHERE e.course_no = c.prerequisite) as course_name,
    COUNT(*) as cnt
FROM
    course c
WHERE 
    c.prerequisite IS NOT NULL
GROUP BY 
    c.prerequisite
ORDER BY
    prerequisite
----------
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Input In [214], in <module>
      1 query="""
      2 SELECT
      3     prerequisite AS prerequisite,
   (...)
     13     prerequisite
     14 """
---> 15 spark.sql(query).show(truncate=False)

File /opt/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/sql/dataframe.py:486, in DataFrame.show(self, n, truncate, vertical)
    484     print(self._jdf.showString(n, 20, vertical))
    485 else:
--> 486     print(self._jdf.showString(n, int(truncate), vertical))

File /opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
   1298 command = proto.CALL_COMMAND_NAME +\
   1299     self.command_header +\
   1300     args_command +\
   1301     proto.END_COMMAND_PART
   1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
   1305     answer, self.gateway_client, self.target_id, self.name)
   1307 for temp_arg in temp_args:
   1308     temp_arg._detach()

File /opt/spark/spark-3.1.2/python/lib/pyspark.zip/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
    109 def deco(*a, **kw):
    110     try:
--> 111         return f(*a, **kw)
    112     except py4j.protocol.Py4JJavaError as e:
    113         converted = convert_exception(e.java_exception)

File /opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o1058.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: first(description)#13046
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:75)
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:323)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:323)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:307)
    at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:74)
    at org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:96)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReferences(BoundAttribute.scala:96)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultFunction(HashAggregateExec.scala:554)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:741)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
    at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
    at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:655)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:718)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:187)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
    at sun.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Couldn't find first(description)#13046 in [prerequisite#5422,count(1)#13045L]
    at scala.sys.package$.error(package.scala:30)
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 61 more
4

2 回答 2

1

我认为您可以简单地使用 join 而不是相关子查询:

SELECT
  c.prerequisite,
  FIRST(e.description) AS course_name,
  COUNT(*) AS cnt
FROM
  course c
LEFT JOIN
  course e
ON
  e.course_no = c.prerequisite
WHERE
  c.prerequisite IS NOT NULL
GROUP BY
  c.prerequisite
ORDER BY
  1
于 2022-02-20T15:13:55.203 回答
-1

你试过下面的写法吗?

`SELECT
    prerequisite AS prerequisite,
    FIRST((SELECT e.description FROM course e WHERE e.course_no = c.prerequisite)) as course_name,
    COUNT(*) as cnt
FROM
    course c
WHERE 
    c.prerequisite IS NOT NULL
GROUP BY 
    c.prerequisite
ORDER BY
    prerequisite`

我觉得没问题...

于 2022-02-19T08:56:51.783 回答