0

我想做的事情的大纲如下:

#function to be run about 6k times, doesn't return anything
someFunction <- function(integer,string)
{...}
#parameters to be passed
params <- data.frame(ints=1:6000, strings=...)
#number of cores the system has
numCores <- detectCores()

count <- 1;
#run while there are still tasks to be run and there is a free core
while(count <= 6000 && <# of running tasks < numCores>)
{
   count <- count + 1;
   <assign someFunction(params$ints[count],params$strings[count]) to free core>
}

<> 括号中的两段代码是我不确定如何进行的地方。我在 Ubuntu 系统上,所以我可以使用多核包。

我怀疑我可能需要一个整数来跟踪正在使用的内核数量,当一个进程完成时,它只是从这个整数中减去一个,一个新进程加一个,或者可能是一个长度数组numCores来保持 0 或 1 如果它是否被使用。

4

3 回答 3

4

R 中的并行编程包往往比您描述的要高。由于您没有使用 Windows,因此您可以使用parallel包中的“多核”API:

library(parallel)
someFunction <- function(int, string) {
  paste0(string, int)
}
params <- data.frame(ints=1:6000,
                     strings=rep(letters, length.out=6000))
numCores <- detectCores()

r <- mcmapply(someFunction, params$ints, params$strings,
              SIMPLIFY=FALSE, mc.cores=numCores)

这将在列表中返回 6000 个结果r

另一种选择是foreach包装。在您的情况下,我推荐它,doMC因为它还使用parallel包中的“多核”API。首先你加载并注册doMC

library(doMC)
registerDoMC(numCores)

然后使用foreachand并行%dopar%调用,以列表形式返回结果:someFunction

r <- foreach(i=params$ints, s=params$strings) %dopar% {
  someFunction(i, s)
}

在这两种情况下,您都不会直接管理如何在核心上安排任务。默认情况下,这两个示例都预先安排了任务,如果任务都花费相同的时间来计算,这通常会更有效,但如果需要负载平衡,可以将其关闭。

于 2013-09-24T16:04:16.893 回答
1

这些使用%dopar%foreach()times()来自foreach包的方法还确保您的迭代按顺序开始。它们类似于clusterApply@mrip 描述的方法。

doParallel 和 doSNOW 方法

如果您想使用doParallelor doSNOW,将通过将每次迭代预分配给核心来foreach平衡负载。(免责声明,我正在多核 Mac 上进行测试)。

在这里测试:

library(doParallel)
cl <- makeCluster(2)
registerDoParallel(cl)

# or 
# library(doSNOW)
# cl <- makeCluster(2, type="SOCK")
# registerDoSNOW(cl)

foreach(i=1:6, duration=c(1,2,5,6,1,1), .combine=rbind) %dopar% {
    start=Sys.time()
    Sys.sleep(sleep_durations[i])
    c(index=i, pid=Sys.getpid(), duration=sleep_durations[i], start=start, end=Sys.time())
}
stopCluster(cl)
         index   pid duration      start        end
# result.1     1 97167        1 1495651947 1495651948
# result.2     2 97175        5 1495651947 1495651952
# result.3     3 97167        2 1495651948 1495651950
# result.4     4 97167        6 1495651950 1495651956
# result.5     5 97175        1 1495651952 1495651953
# result.6     6 97175        1 1495651953 1495651954

DOMC方法

如果您正在使用doMCtimes(n) %dopar% { code block }将运行代码块n时间。这将分配给可用的核心,而不是平均分配每个核心的作业数量(这是 @steve-weston 的doMC包在 期间所做的foreach)。

这里的问题是times()不允许您指定与迭代相关的参数。此外,doMC的环境分叉不允许您写入共享内存。

如果您在单个节点上工作,则可以使用文件系统作为跟踪迭代的锁定机制。

lock <- tempdir(); dir.create(lock)  # Empty dir before re-running the loop
times(6000) %dopar% { 
#   Sys.sleep(runif(1));  # Uncomment if lock fails (iteration sequences contains skips, followed by repeats, e.g. 1, 3, 3, 4) in attempt to stagger processes 
    file.create(file.path(lock, runif(1)));  # Filename can be anything, but must be unique
    count <- length(list.files(lock));
    someFunction(params$ints[count],params$strings[count])
}

在这里测试:

library(doMC)
registerDoMC(cores=2)
sleep_durations <- c(1, 5, 2, 6, 1, 1)

napVerbosely <- function(pid, duration) {
    print(sprintf("[pid %s] Sleeping for %s sec", pid, duration))
    Sys.sleep(duration)
}

lock <- tempdir()
dir.create(lock)
times(6) %dopar% {
    file.create(file.path(lock, runif(1)));
    count <- length(list.files(lock));
    napVerbosely(Sys.getpid(), sleep_durations[count])    
}
# [1] "[pid 96874] Sleeping for 1 sec"
# [1] "[pid 96875] Sleeping for 5 sec"
# [1] "[pid 96874] Sleeping for 2 sec"
# [1] "[pid 96874] Sleeping for 6 sec"
# [1] "[pid 96875] Sleeping for 1 sec"
# [1] "[pid 96875] Sleeping for 1 sec"
# NULL
于 2017-05-24T18:19:41.650 回答
0

您应该使用多核或雪包。有了雪,你可以很简单地完成你所追求的:

clust<-makeCluster(detectCores())
clusterExport(clust,"someFunction") ## added on edit, see comment by Steve Weston
clustFun<-function(i,params){someFunction(params$ints[i],params$strings[i])}
clusterApply(clust,1:6000,clustFun,params)

这将并行评估调用并将结果分配给长度为 6000 的列表。您还可以使用clusterApplyLBwhich 执行类似于您所描述的负载平衡。

http://cran.r-project.org/web/packages/snow/index.html

于 2013-09-24T16:03:58.827 回答