很长一段时间以来,我一直将 sfLapply 用于我的许多并行 r 脚本。但是,最近随着我对并行计算的深入研究,我一直在使用 sfClusterApplyLB,如果单个实例的运行时间不同,它可以节省大量时间。由于 sfLapply 将等待批次的每个实例完成,然后再加载新批次(这可能导致空闲实例),完成任务的 sfClusterApplyLB 实例将立即分配给列表中的剩余元素,从而可能节省很多当实例不花费完全相同的时间时。这让我质疑为什么我们不想在使用降雪时对我们的运行进行负载平衡?到目前为止我唯一发现的是,当并行脚本中出现错误时,sfClusterApplyLB 在给出错误之前仍会循环遍历整个列表,而 sfLapply 将在尝试第一批后停止。我还缺少什么?负载平衡还有其他成本/缺点吗?下面是一个示例代码,显示了两者之间的区别
rm(list = ls()) #remove all past worksheet variables
working_dir="D:/temp/"
setwd(working_dir)
n_spp=16
spp_nmS=paste0("sp_",c(1:n_spp))
spp_nm=spp_nmS[1]
sp_parallel_run=function(sp_nm){
sink(file(paste0(working_dir,sp_nm,"_log.txt"), open="wt"))#######NEW
cat('\n', 'Started on ', date(), '\n')
ptm0 <- proc.time()
jnk=round(runif(1)*8000000) #this is just a redundant script that takes an arbitrary amount of time to run
jnk1=runif(jnk)
for (i in 1:length(jnk1)){
jnk1[i]=jnk[i]*runif(1)
}
ptm1=proc.time() - ptm0
jnk=as.numeric(ptm1[3])
cat('\n','It took ', jnk, "seconds to model", sp_nm)
#stop sinks
sink.reset <- function(){
for(i in seq_len(sink.number())){
sink(NULL)
}
}
sink.reset()
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS'))
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
system.time((sfLapply(spp_nmS,fun=sp_parallel_run)))
sfRemoveAll()
sfStop()
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
system.time(sfClusterApplyLB(spp_nmS,fun=sp_parallel_run))
sfRemoveAll()
sfStop()