4

我试图弄清楚boot()在运行并行计算时如何将函数和包传递给函数。在循环中加载包或定义函数似乎非常昂贵。foreach()我经常用于其他并行任务的函数有一个 .packages 和 .export 参数,它们以一种很好的方式处理这个(参见这个SO question),但我不知道如何使用引导包来做到这一点。

下面是一个毫无意义的示例,显示了切换到并行时会发生什么:

library(boot)
myMean <- function(x) mean(x)
meaninglessTest <- function(x, i){
  return(myMean(x[i]))
}

x <- runif(1000)

bootTest <- function(){
  out <- boot(data=x, statistic=meaninglessTest, R=10000, parallel="snow", ncpus=4)
  return(boot.ci(out, type="perc"))
}

bootTest()

抱怨(如预期)它找不到myMean

旁注:运行此示例时,它的运行速度比单核慢,可能是因为将这个简单的任务拆分到多个核上比实际任务更耗时。为什么默认情况下不分成均匀的作业批次R/ncpus- 这不是默认行为有什么原因吗?

旁注更新:正如史蒂夫韦斯顿所指出的, boot() 使用的 parLapply 实际上将作业分成均匀的批次/块。该函数是 clusterApply 的简洁包装器:

docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, 
    fun, ...))

当我扩大重复次数时,这并没有更好的性能,我有点惊讶:

> library(boot)
> set.seed(10)
> x <- runif(1000)
> 
> Reps <- 10^4
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="no")
> Sys.time()-start_time
Time difference of 0.52335 secs
> 
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="snow", ncpus=4)
> Sys.time()-start_time
Time difference of 3.539357 secs
> 
> Reps <- 10^5
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="no")
> Sys.time()-start_time
Time difference of 5.749831 secs
> 
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="snow", ncpus=4)
> Sys.time()-start_time
Time difference of 23.06837 secs

我希望这只是由于非常简单的均值函数,并且更复杂的情况表现得更好。我必须承认,我觉得这有点令人不安,因为集群初始化时间在 10.000 和 100.000 的情况下应该是相同的,但是绝对时间差会增加,并且 4 核版本需要 5 倍的时间。我想这一定是列表合并的影响,因为我找不到任何其他解释。

4

1 回答 1

3

如果要并行执行的函数(meaninglessTest在这种情况下)有额外的依赖项(例如myMean),标准的解决方案是通过clusterExport函数将这些依赖项导出到集群。这需要创建一个集群对象并通过boot“cl”参数将其传递给:

library(boot)
library(parallel)
myMean <- function(x) mean(x)
meaninglessTest <- function(x, i){
  return(myMean(x[i]))
}
cl <- makePSOCKcluster(4)
clusterExport(cl, 'myMean')

x <- runif(1000)

bootTest <- function() {
  out <- boot(data=x, statistic=meaninglessTest, R=10000,
              parallel="snow", ncpus=4, cl=cl)
  return(boot.ci(out, type="perc"))
}

bootTest()
stopCluster(cl)

请注意,一旦集群工作人员被初始化,它们可以被boot多次使用并且不需要重新初始化,所以它并不那么昂贵。

要在集群工作人员上加载包,您可以使用clusterEvalQ

clusterEvalQ(cl, library(randomForest))

这很好也很简单,但是对于更复杂的 worker 初始化,我通常会创建一个“worker init”函数并通过它执行它,clusterCall这非常适合在每个 worker 上执行一次函数。

至于您的旁注,性能很差,因为正如您所说,统计功能所做的工作很少,但我不确定您为什么认为工作没有在工作人员之间平均分配。在这种情况下,该parLapply函数用于并行完成工作,它确实均匀且相当有效地拆分工作,但这并不能保证比使用顺序运行更好的性能lapply。但也许我误解了你的问题。

于 2013-07-26T12:41:22.780 回答