6

我在分别具有 4 个和 8 个物理内核和逻辑内核的 PC(OS Linux)上运行以下代码(从doParallel 的 Vignettes中提取)。

使用或更少运行代码iter=1e+6,一切都很好,我可以从 CPU 使用情况中看到,所有内核都用于此计算。然而,随着迭代次数的增加(例如iter=4e+6),并行计算似乎在这种情况下不起作用。当我还监控 CPU 使用率时,只有一个核心参与计算(100% 使用率)。

示例 1

require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        ind <- sample(100, 100, replace=TRUE)
        result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
        coefficients(result1)
    }
})[3]

你知道可能是什么原因吗?记忆可能是原因吗?

我四处搜索,发现与我的问题有关,但关键是我没有遇到任何错误,而且 OP 似乎通过在foreach循环中提供必要的包来提出解决方案。但是可以看出,我的循环中没有使用任何包。

更新1

我的问题仍然没有解决。根据我的实验,我不认为记忆可能是原因。我在运行以下简单并行(在所有 8 个逻辑内核上)迭代的系统上有 8GB 内存:

示例 2

require("doParallel")
require("foreach")

registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        i
    }
})[3]

我运行这段代码没有问题,但是当我监控 CPU 使用率时,只有一个核心(共 8 个)是 100%。

更新2

至于Example2,@SteveWeston(感谢您指出这一点)表示(在评论中):“您更新中的示例遇到了小任务。只有主人有任何真正的工作要做,包括发送任务和处理“

但是,Example1仍未解决。当我运行它并使用 监视进程htop时,更详细的情况如下:

p1让我们通过命名所有 8 个创建的进程p8。状态(中的列Shtop表示p1R正在运行并且保持不变。然而,p2直到p8,几分钟后,状态变为D(即不间断睡眠),几分钟后,再次变为Z(即终止但未被其父级收割)。你知道为什么会这样吗?

4

2 回答 2

5

我认为你的内存不足。这是该示例的修改版本,当您有很多任务时应该会更好。它使用 doSNOW 而不是 doParallel,因为 doSNOW 允许您在工作人员返回时使用 combine 函数处理结果。此示例将这些结果写入文件以使用更少的内存,但是它在最后使用“.final”函数将结果读回内存,但如果您没有足够的内存,您可以跳过它。

library(doSNOW)
library(tcltk)
nw <- 4  # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)

x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000  # may require tuning for your machine
maxcomb <- nw + 1  # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)

comb <- function(fobj, ...) {
  for(r in list(...))
    writeBin(r, fobj)
  fobj
}

final <- function(fobj) {
  close(fobj)
  t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}

mkprogress <- function(total) {
  pb <- tkProgressBar(max=total,
                      label=sprintf('total tasks: %d', total))
  function(n, tag) {
    setTkProgressBar(pb, n,
      label=sprintf('last completed task: %d of %d', tag, total))
  }
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')

r <-
  foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
          .maxcombine=maxcomb, .init=resultFile, .final=final,
          .inorder=FALSE, .options.snow=opts) %dopar% {
    do.call('c', lapply(seq_len(n), function(i) {
      ind <- sample(100, 100, replace=TRUE)
      result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
      coefficients(result1)
    }))
  }

我包含了一个进度条,因为这个示例需要几个小时才能执行。

请注意,此示例还使用包中的idiv函数iterators来增加每个任务的工作量。这种技术称为分块,通常可以提高并行性能。但是,使用idiv会弄乱任务索引,因为该变量i现在是每个任务的索引而不是全局索引。对于全局索引,您可以编写一个包含以下内容的自定义迭代器idiv

idivix <- function(n, chunkSize) {
  i <- 1
  it <- idiv(n, chunkSize=chunkSize)
  nextEl <- function() {
    m <- nextElem(it)  # may throw 'StopIterator'
    value <- list(i=i, m=m)
    i <<- i + m
    value
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c('abstractiter', 'iter')
  obj
}

此迭代器发出的值是列表,每个列表都包含一个起始索引和一个计数。这是一个使用此自定义迭代器的简单 foreach 循环:

r <- 
  foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
    do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
      i
    }))
  }

当然,如果任务的计算量足够大,您可能不需要分块,并且可以像原始示例中那样使用简单的 foreach 循环。

于 2016-06-12T01:11:53.980 回答
3

起初我以为您遇到了内存问题,因为提交许多任务确实会使用更多内存,这最终会导致主进程陷入困境,所以我的原始答案显示了几种使用更少内存的技术。但是,现在听起来好像有一个启动和关闭阶段,只有主进程在忙,而工作人员在中间忙了一段时间。我认为问题在于这个示例中的任务并不是真正的计算密集型,所以当你有很多任务时,你开始真正注意到启动和关闭时间。我对实际计算进行计时,发现每个任务只需要大约 3 毫秒。过去,你不会从并行计算中获得任何好处,任务那么小,但现在,取决于你的机器,

我仍然认为我的其他答案对这个问题很有效,但是由于你有足够的内存,所以它是矫枉过正的。使用分块的最重要技术。这是一个使用分块的示例,对原始示例的更改最少:

require("doParallel")
nw <- 8
registerDoParallel(nw)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
niter <- 4e+6
r <- foreach(n=idiv(niter, chunks=nw), .combine='rbind') %dopar% {
  do.call('rbind', lapply(seq_len(n), function(i) {
    ind <- sample(100, 100, replace=TRUE)
    result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
    coefficients(result1)
  }))
}

请注意,这与我的其他答案略有不同。chunks通过使用 idiv选项而不是该选项,它只使用每个工作人员一个任务chunkSize。这减少了 master 完成的工作量,如果你有足够的内存,这是一个很好的策略。

于 2016-06-15T14:46:15.500 回答