1

我有以下data.frame:

library(sparklyr)
library(dplyr)
testDF <- data.frame(A = c(1, 2, 3, 4, 5, 6, 7, 8), 
B = c(10, 20, 30, 40, 50, 60, 70, 80), 
C = c(100, 200, 300, 400, 500, 600, 700, 800), 
D = c(1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000))

创建后,我可以使用sparklyr将其复制到 Spark 中。

testDFCopied <- copy_to(sc, testDF, "testDF", overwrite = TRUE)

创建后,我可以mutate创建一个列,使用该函数创建另一个列lag

head(testDFCopied %>% dplyr::arrange(A) %>% dplyr::mutate(E = lag(A)), 10)
Source:   query [?? x 5]
Database: spark connection master=yarn app=sparklyr local=FALSE

      A     B     C     D     E
  <dbl> <dbl> <dbl> <dbl> <dbl>
1     1    10   100  1000   NaN
2     2    20   200  2000     1
3     3    30   300  3000     2
4     4    40   400  4000     3
5     5    50   500  5000     4
6     6    60   600  6000     5
7     7    70   700  7000     6
8     8    80   800  8000     7

当我尝试使用该函数创建多个列mutate并同时使用该函数时,就会出现问题lag。例如,在这里我想创建两个新列 E 和 F,它们是列 A 和 B 的“滞后”:

head(testDFCopied %>% dplyr::arrange(A) %>% dplyr::mutate(E = lag(A), F = lag(B)), 10)
Source:   query [?? x 6]
Database: spark connection master=yarn app=sparklyr local=FALSE

Error: org.apache.spark.sql.AnalysisException: Window Frame RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW must match the required frame ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$29$$anonfun$applyOrElse$10.applyOrElse(Analyzer.scala:1785)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$29$$anonfun$applyOrElse$10.applyOrElse(Analyzer.scala:1781)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)

但是,如果我创建两列但lag只使用一次,则不会引发此异常,例如:

head(testDFCopied %>% dplyr::arrange(A) %>% dplyr::mutate(E = lag(A), F = C - B), 10)
Source:   query [?? x 6]
Database: spark connection master=yarn app=sparklyr local=FALSE

      A     B     C     D     E     F
  <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1     1    10   100  1000   NaN    90
2     2    20   200  2000     1   180
3     3    30   300  3000     2   270
4     4    40   400  4000     3   360
5     5    50   500  5000     4   450
6     6    60   600  6000     5   540
7     7    70   700  7000     6   630
8     8    80   800  8000     7   720

lag()出于某种原因,仅当在操作中执行两个调用时才会引发异常mutate。我已经尝试(不成功)不同的组合lag()和. 他们都提出了同样的例外,我不明白。查看 Spark 代码,我可以看到这里引发了异常:lead()mutate

  /**
   * Check and add proper window frames for all window functions.
   */
  object ResolveWindowFrame extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case logical: LogicalPlan => logical transformExpressions {
        case WindowExpression(wf: WindowFunction,
        WindowSpecDefinition(_, _, f: SpecifiedWindowFrame))
          if wf.frame != UnspecifiedFrame && wf.frame != f =>
          failAnalysis(s"Window Frame $f must match the required frame ${wf.frame}")
...

我知道它应该与lag无法检查的窗口函数的某些条件有关,但我并不真正理解这里的潜在问题。任何帮助/想法将不胜感激。

4

0 回答 0