4

我正在尝试在具有 8 个内核的计算机上使用并行包在Windows上处理 R 中的大量数据。我有一个大的data.frame,我需要逐行处理。对于每一行,我可以估计处理该行需要多长时间,这可以从每行 10 秒到 4 小时不等。

我不想在 clusterApplyLB 函数下一次运行整个程序(我知道这可能是最优化的方法),因为如果它遇到错误,那么我的整个结果集可能会丢失。我第一次尝试运行我的程序涉及将其分解为块,然后单独并行运行每个块,保存该并行运行的输出,然后转到下一个块。

问题在于,当它通过行运行时,而不是以 7 倍“实时”运行(我有 8 个内核,但我想保留一个备用),它似乎只以大约 2 倍的速度运行。我猜这是因为向每个核心分配行效率低下。

例如,2 核的 10 行数据,其中两行可以运行 4 小时,另外两行需要 10 秒。理论上,这可能需要 4 小时 10 秒才能运行,但如果分配效率低下,则可能需要 8 小时。(显然这是夸大其词,但是当估计不正确时,可能会发生类似的情况,因为更多的核心和更多的行)

如果我估计这些时间并以我估计的正确顺序将它们提交给 clusterApplyLB(以使估计的时间分布在内核之间以最大限度地减少所花费的时间),它们可能不会被发送到我希望它们发送的内核是,因为他们可能不会在我估计的时间内完成。例如,我估计两个进程的时间分别为 10 分钟和 12 分钟,它们需要 11.6 分钟和 11.4 秒,那么提交给 clusterApplyLB 的行的顺序将不是我预期的。这种错误可能看起来很小,但是如果我优化了多个长时间的行,那么这种顺序混淆可能会导致两个 4 小时的行转到同一个节点而不是不同的节点(这几乎可以使我的总时间)。

TL;博士。我的问题:有没有办法告诉 R 并行处理函数(例如 clusterApplyLB、clusterApply、parApply 或任何 sapply、lapply 或 foreach 变体)哪些行应该发送到哪个核心/节点?即使没有我发现自己所处的情况,我认为这将是提供信息的非常有用和有趣的事情。

4

2 回答 2

2

我想说有两种不同的可能解决您的问题的方法。

第一个是根据预期的每个作业计算时间对作业到节点映射的静态优化。在开始计算之前,您将为每个作业(即数据框的行)分配一个节点。下面给出了一个可能的实现代码。

第二种解决方案是动态的,您必须根据clusterApplyLB. 您将开始与第一种方法相同,但一旦作业完成,您将不得不重新计算最佳作业到节点的映射。根据您的问题,由于发生的不断重新优化,这可能会增加大量开销。我认为只要您的预期计算时间没有偏差,就没有必要这样做。

这里是第一种解决方案的代码:

library(parallel)
#set seed for reproducible example
set.seed(1234)
#let's say you have 100 calculations (i.e., rows)
#each of them takes between 0 and 1 second computation time
expected_job_length=runif(100)
#this is your data
#real_job_length is unknown but we use it in the mock-up function below
df=data.frame(job_id=seq_along(expected_job_length),
              expected_job_length=expected_job_length,
              #real_job_length=expected_job_length + some noise
              real_job_length=expected_job_length+
                runif(length(expected_job_length),-0.05,0.05))
#we might have a negative real_job_length; fix that
df=within(df,real_job_length[real_job_length<0]<-
            real_job_length[real_job_length<0]+0.05)
#detectCores() gives in my case 4
cluster_size=4

准备作业到节点的映射优化:

#x will give the node_id (between 1 and cluster_size) for each job
total_time=function(x,expected_job_length) {
  #in the calculation below, x will be a vector of reals
  #we have to translate it into integers in order to use it as index vector
  x=as.integer(round(x))
  #return max of sum of node-binned expected job lengths
  max(sapply(split(expected_job_length,x),sum))
}

#now optimize the distribution of jobs amongst the nodes
#Genetic algorithm might be better for the optimization
#but Differential Evolution is good for now
library(DEoptim)
#pick large differential weighting factor (F) ...
#... to get out of local minimas due to rounding
res=DEoptim(fn=total_time,
            lower=rep(1,nrow(df)),
            upper=rep(cluster_size,nrow(df)),
            expected_job_length=expected_job_length,
            control=DEoptim.control(CR=0.85,F=1.5,trace=FALSE))
#wait for a minute or two ...
#inspect optimal solution
time_per_node=sapply(split(expected_job_length,
                           unname(round(res$optim$bestmem))),sum)
time_per_node
#       1        2        3        4 
#10.91765 10.94893 10.94069 10.94246
plot(time_per_node,ylim=c(0,15))
abline(h=max(time_per_node),lty=2)

#add node-mapping to df
df$node_id=unname(round(res$optim$bestmem))

现在是在集群上进行计算的时候了:

#start cluster
workers=parallel::makeCluster(cluster_size)

start_time=Sys.time()
#distribute jobs according to optimal node-mapping
clusterApply(workers,split(df,df$node_id),function(x) {
  for (i in seq_along(x$job_id)) {
    #use tryCatch to do the error handling for jobs that fail
    tryCatch({Sys.sleep(x[i,"real_job_length"])},
             error=function(err) {print("Do your error handling")})
  }
})
end_time=Sys.time()

#how long did it take
end_time-start_time
#Time difference of 11.12532 secs

#add to plot
abline(h=as.numeric(end_time-start_time),col="red",lty=2)

stopCluster(workers)
于 2016-09-14T18:42:18.917 回答
-1

根据 input ,您似乎已经在该任务中保存了该任务的输出。假设每个并行任务都将输出保存为文件,您可能需要一个初始函数来预测特定行的时间。为了做到这一点

  • 生成具有估计时间和行号的结构
  • 对估计的时间和重新排序的行进行排序,并为每个重新排序的行运行并行过程。

这将自动平衡工作量。我们有一个类似的问题,该过程必须逐列完成,每列需要10-200 秒。因此,我们生成了一个函数来估计时间,根据该函数对列进行重新排序,并为每一列运行并行处理。

于 2017-06-01T21:02:02.160 回答