2

假设我想执行如下操作:

library(SparkR)
...
df = spark.read.parquet(<some_address>)
df.gapply(
    df,
    df$column1,
    function(key, x) {
        return(data.frame(x, newcol1=f1(x), newcol2=f2(x))
    }
)

其中函数的返回有多行。需要明确的是,文档中的示例(遗憾地与 Spark 文档的大部分内容相呼应,其中示例非常简单)并不能帮助我确定这是否会按照我的预期进行处理。

我希望这样做的结果是,对于在 DataFrame 中创建的 k 个组,每个组具有 n_k 个输出行,gapply() 调用的结果将具有 sum(1..k, n_k) 行,其中关键为键 k 中的每个组的每个 n_k 行复制值...但是,模式字段向我表明这不是处理方式-实际上它表明它要么希望将结果推送到单行。

希望这很清楚,尽管是理论上的(对不起,我不能分享我的实际代码示例)。有人可以验证或解释如何实际处理这样的功能吗?

4

1 回答 1

0

官方文档中明确说明了有关输入和输出的确切期望:

将函数应用于SparkDataFrame. 该函数将应用于 SparkDataFrame 的每一组,并且应该只有两个参数:分组键和data.frame对应于该键的R。这些组是从SparkDataFrames列中选择的。函数的输出应该是 a data.frame

Schema 指定结果的行格式SparkDataFrame。它必须基于 Spark 数据类型来表示 R 函数的输出模式。返回的列名data.frame由用户设置。下面是 R 和 Spark 之间的数据类型映射。

换句话说,您的函数应该采用keydata.frame该键对应的行和行,并返回data.frame可以使用 Spark SQL 类型表示的行,并将架构作为schema参数提供。行数没有限制。例如,您可以按如下方式应用身份转换:

df <- as.DataFrame(iris)

gapply(df, "Species", function(k, x) x, schema(df))

与聚合相同的方式:

gapply(df, "Species",
  function(k, x) {
    dplyr::summarize(dplyr::group_by(x, Species), max(Sepal_Width))
  },
  structType(
    structField("species", "string"),
    structField("max_s_width", "double"))
)

尽管在实践中您应该更喜欢直接在DataFrame( groupBy %>% agg) 上进行聚合。

于 2016-09-08T16:47:52.330 回答