1

我正在使用sparklyr图书馆。

我有一个变量,wtd我将其复制到 spark:

copy_to(sc,wtd)
colnames(wtd) <- c("a","b","c","d","e","f","g")

然后我想做一个计算并将其存储在 spark 中,而不是在我的 R 环境中。

当我尝试:

sdf_register(wtd %>% group_by(c,b) %>% filter(row_number()==1) %>%count(d), "wtd2")

UseMethod(“sdf_register”)中的错误:没有适用于“sdf_register”的方法应用于类“c('tbl_df','tbl','data.frame')”的对象

该命令wtd2 = wtd %>% group_by(c,b) %>% filter(row_number()==1) %>%count(d)可以正常工作,但这会将其存储在我的环境中,而不是火花中。

4

1 回答 1

4

您的操作序列中的第一个参数应该是“tbl_spark”,而不是常规的data.frame. 你的命令,

wtd2 = wtd %>% group_by(c,b) %>% filter(row_number()==1) %>%count(d)

工作,因为你根本没有使用 Spark,只是普通的 R data.frames。

如果您想将它与 spark 一起使用,首先,存储spark_tbl复制时返回的变量data.frame

colnames(wtd) <- c("a","b","c","d","e","f","g")
wtd_tbl <- copy_to(sc, wtd)

然后,您可以使用sdf_register(wtd_tbl %>% ..., "wtd2").

如果您按照定义执行管道,您将收到异常消息:

Error: org.apache.spark.sql.AnalysisException: Window function rownumber() requires window to be ordered

这是因为要row_number()在 Spark 中使用,首先需要提供一个“订单功能”。你可以用arrange(). 我假设您希望您的行按“c”和“b”列排序,因此您的最终管道将是这样的:

sdf_register(wtd_tbl %>% 
               dplyr::group_by(c, b) %>% 
               arrange(c, b) %>% 
               dplyr::filter(row_number() == 1) %>% 
               dplyr::count(d),
             "wtd2")

我希望这有帮助。

于 2017-04-09T07:59:35.680 回答