0

我对 Spark 很陌生,我试图在网上寻找一些东西,但我没有找到任何令人满意的东西。

我一直使用命令运行并行计算mclapply,我喜欢它的结构(即,第一个参数用作滚动索引,第二个参数是要并行化的函数,然后是传递给函数的其他可选参数)。现在我正在尝试通过 Spark 做同样的事情,即我想在 Spark 集群的所有节点之间分配我的计算。这就是我所学到的以及我认为代码应该如何构造的内容(我正在使用包sparklyr):

  1. 我使用命令创建到 Spark 的连接spark_connect
  2. 我在 Spark 环境中复制我的 data.framecopy_to并通过它的 tibble 访问它;
  3. 我想实现一个“Spark-friendly”版本mclapply,但我看到包中没有类似的功能(我看到包中存在该功能spark.lapplySparkR但不幸的是它不再在 CRAN 中)。

下面是我实现的一个简单的测试脚本,它使用函数mclapply.

#### Standard code that works with mclapply #########
dfTest = data.frame(X = rep(1, 10000), Y = rep(2, 10000))

.testFunc = function(X = 1, df, str) {
    rowSelected = df[X, ]
    y = as.numeric(rowSelected[1] + rowSelected[2])
    return(list(y = y, str = str))
}

lOutput = mclapply(X = 1 : nrow(dfTest), FUN = .testFunc, df = dfTest, 
                   str = "useless string", mc.cores = 2)

######################################################

###### Similar code that should work with Spark ######
library(sparklyr)
sc = spark_connect(master = "local")

dfTest = data.frame(X = rep(1, 10000), Y = rep(2, 10000))

.testFunc = function(X = 1, df, str) {
  rowSelected = df[X, ]
  nSum = as.numeric(rowSelected[1] + rowSelected[2])
  return(list(nSum = nSum, str = str))
}

dfTest_tbl = copy_to(sc, dfTest, "test_tbl", overwrite = TRUE)

# Apply similar function mclapply to dfTest_tbl, that works with 
# Spark
# ???
######################################################

如果有人已经找到了解决方案,那就太好了。其他参考/指南/链接也非常受欢迎。谢谢!

4

2 回答 2

5

闪闪发光的

spark_apply是您正在寻找的现有功能:

spark_apply(sdf, function(data) {
   ...
})

有关详细信息,请参阅文档中的分布式R。sparklyr

火花R

与 SparkR 一起使用gapply/gapplyCollect

gapply(df, groupingCols, function(data) {...} schema)

dapply/dapplyCollect

dapply(df, function(data) {...}, schema)

UDF。参考

详情。

请注意,与本机 Spark 代码相比,所有解决方案都较差,在需要高性能时应避免使用。

于 2018-01-15T14:32:49.067 回答
1

sparklyr::spark_apply现在可以支持将一些外部变量(如模型)作为上下文传递。

这是我在 sparklyr 上运行 xgboost 模型的示例:

bst <- xgboost::xgb.load("project/models/xgboost.model")
res3 <- spark_apply(x = ft_union_price %>% sdf_repartition(partitions = 1500, partition_by = "uid"),
                   f = inference_fn,
                   packages = F,
                   memory = F,
                   names = c("uid",
                               "action_1",
                               "pred"), 
                   context = {model <- bst})
于 2018-10-27T15:13:37.293 回答