5

很长一段时间以来,我一直将 sfLapply 用于我的许多并行 r 脚本。但是,最近随着我对并行计算的深入研究,我一直在使用 sfClusterApplyLB,如果单个实例的运行时间不同,它可以节省大量时间。由于 sfLapply 将等待批次的每个实例完成,然后再加载新批次(这可能导致空闲实例),完成任务的 sfClusterApplyLB 实例将立即分配给列表中的剩余元素,从而可能节省很多当实例不花费完全相同的时间时。这让我质疑为什么我们不想在使用降雪时对我们的运行进行负载平衡?到目前为止我唯一发现的是,当并行脚本中出现错误时,sfClusterApplyLB 在给出错误之前仍会循环遍历整个列表,而 sfLapply 将在尝试第一批后停止。我还缺少什么?负载平衡还有其他成本/缺点吗?下面是一个示例代码,显示了两者之间的区别

rm(list = ls()) #remove all past worksheet variables
working_dir="D:/temp/"
setwd(working_dir)
n_spp=16
spp_nmS=paste0("sp_",c(1:n_spp))
spp_nm=spp_nmS[1]
sp_parallel_run=function(sp_nm){
  sink(file(paste0(working_dir,sp_nm,"_log.txt"), open="wt"))#######NEW
  cat('\n', 'Started on ', date(), '\n') 
  ptm0 <- proc.time()
  jnk=round(runif(1)*8000000) #this is just a redundant script that takes an arbitrary amount of time to run
  jnk1=runif(jnk)
  for (i in 1:length(jnk1)){
    jnk1[i]=jnk[i]*runif(1)
  }
  ptm1=proc.time() - ptm0
  jnk=as.numeric(ptm1[3])
  cat('\n','It took ', jnk, "seconds to model", sp_nm)

  #stop sinks
  sink.reset <- function(){
    for(i in seq_len(sink.number())){
      sink(NULL)
    }
  }
  sink.reset()
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS'))

sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
system.time((sfLapply(spp_nmS,fun=sp_parallel_run)))
sfRemoveAll()
sfStop()

sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
system.time(sfClusterApplyLB(spp_nmS,fun=sp_parallel_run)) 
sfRemoveAll()
sfStop()
4

1 回答 1

3

sfLapply函数很有用,因为它将输入值拆分为每个可用工作人员的一组任务,这就是该mclapply函数所说的preschedulingsfClusterApplyLB与任务不需要很长时间的情况相比,这可以提供更好的性能。

这是一个极端的例子,展示了预调度的优势:

> system.time(sfLapply(1:100000, sqrt))
   user  system elapsed
  0.148   0.004   0.170
> system.time(sfClusterApplyLB(1:100000, sqrt))
   user  system elapsed
 19.317   1.852  21.222
于 2014-02-03T21:20:05.917 回答