这些使用%dopar%
和foreach()
或times()
来自foreach
包的方法还确保您的迭代按顺序开始。它们类似于clusterApply
@mrip 描述的方法。
doParallel 和 doSNOW 方法
如果您想使用doParallel
or doSNOW
,将通过不将每次迭代预分配给核心来foreach
平衡负载。(免责声明,我正在多核 Mac 上进行测试)。
在这里测试:
library(doParallel)
cl <- makeCluster(2)
registerDoParallel(cl)
# or
# library(doSNOW)
# cl <- makeCluster(2, type="SOCK")
# registerDoSNOW(cl)
foreach(i=1:6, duration=c(1,2,5,6,1,1), .combine=rbind) %dopar% {
start=Sys.time()
Sys.sleep(sleep_durations[i])
c(index=i, pid=Sys.getpid(), duration=sleep_durations[i], start=start, end=Sys.time())
}
stopCluster(cl)
index pid duration start end
# result.1 1 97167 1 1495651947 1495651948
# result.2 2 97175 5 1495651947 1495651952
# result.3 3 97167 2 1495651948 1495651950
# result.4 4 97167 6 1495651950 1495651956
# result.5 5 97175 1 1495651952 1495651953
# result.6 6 97175 1 1495651953 1495651954
DOMC方法
如果您正在使用doMC
,times(n) %dopar% { code block }
将运行代码块n
时间。这将分配给可用的核心,而不是平均分配每个核心的作业数量(这是 @steve-weston 的doMC
包在 期间所做的foreach
)。
这里的问题是times()
不允许您指定与迭代相关的参数。此外,doMC
的环境分叉不允许您写入共享内存。
如果您在单个节点上工作,则可以使用文件系统作为跟踪迭代的锁定机制。
lock <- tempdir(); dir.create(lock) # Empty dir before re-running the loop
times(6000) %dopar% {
# Sys.sleep(runif(1)); # Uncomment if lock fails (iteration sequences contains skips, followed by repeats, e.g. 1, 3, 3, 4) in attempt to stagger processes
file.create(file.path(lock, runif(1))); # Filename can be anything, but must be unique
count <- length(list.files(lock));
someFunction(params$ints[count],params$strings[count])
}
在这里测试:
library(doMC)
registerDoMC(cores=2)
sleep_durations <- c(1, 5, 2, 6, 1, 1)
napVerbosely <- function(pid, duration) {
print(sprintf("[pid %s] Sleeping for %s sec", pid, duration))
Sys.sleep(duration)
}
lock <- tempdir()
dir.create(lock)
times(6) %dopar% {
file.create(file.path(lock, runif(1)));
count <- length(list.files(lock));
napVerbosely(Sys.getpid(), sleep_durations[count])
}
# [1] "[pid 96874] Sleeping for 1 sec"
# [1] "[pid 96875] Sleeping for 5 sec"
# [1] "[pid 96874] Sleeping for 2 sec"
# [1] "[pid 96874] Sleeping for 6 sec"
# [1] "[pid 96875] Sleeping for 1 sec"
# [1] "[pid 96875] Sleeping for 1 sec"
# NULL