23

I am using the R package foreach() with %dopar% to do long (~days) calculations in parallel. I would like the ability to stop the entire set of calculations in the event that one of them produces an error. However, I have not found a way to achieve this, and from the documentation and various forums I have found no indication that this is possible. In particular, break() does not work and stop() only stops the current calculation, not the whole foreach loop.

Note that I cannot use a simple for loop, because ultimately I want to parallelize this using the doRNG package.

Here is a simplified, reproducible version of what I am attempting (shown here in serial with %do%, but I have the same problem when using doRNG and %dopar%). Note that in reality I want to run all of the elements of this loop (here 10) in parallel.

library(foreach)
myfunc <- function() {
  x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% {
    cat("Element ", k, "\n")
    Sys.sleep(0.5) # just to show that stop does not cause exit from foreach
    if(is.element(k, 2:6)) {
      cat("Should stop\n")
      stop("Has stopped")
    }
    k
  }
  return(x)
}
x <- myfunc()
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself.
# x is not returned. The execution produces the error message
# Error in { : task 2 failed - "Has stopped"

What I would like to achieve is that the entire foreach loop can be exited immediately upon some condition (here, when the stop() is encountered).

I have found no way to achieve this with foreach. It seems that I would need a way to send a message to all the other processes to make them stop too.

If not possible with foreach, does anyone know of alternatives? I have also tried to achieve this with parallel::mclapply, but that does not work either.

> sessionInfo()
R version 3.0.0 (2013-04-03)
Platform: x86_64-apple-darwin10.8.0 (64-bit)

locale:
[1] C/UTF-8/C/C/C/C

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods base

other attached packages:
[1] foreach_1.4.0

loaded via a namespace (and not attached):
[1] codetools_0.2-8 compiler_3.0.0  iterators_1.0.6
4

6 回答 6

13

听起来您想要一个不耐烦的“停止”错误处理版本。您可以通过编写自定义组合函数来实现它,并安排在foreach每个结果返回后立即调用它。为此,您需要:

  • 使用支持combine即时调用的后端,例如doMPIdoRedis
  • 不启用.multicombine
  • 设置.inorderFALSE
  • 设置.init为某事(如NULL

这是一个这样做的例子:

library(foreach)
parfun <- function(errval, n) {
  abortable <- function(errfun) {
    comb <- function(x, y) {
      if (inherits(y, 'error')) {
        warning('This will leave your parallel backend in an inconsistent state')
        errfun(y)
      }
      c(x, y)
    }
    foreach(i=seq_len(n), .errorhandling='pass', .export='errval',
            .combine='comb', .inorder=FALSE, .init=NULL) %dopar% {
      if (i == errval)
        stop('testing abort')
      Sys.sleep(10)
      i
    }
  }
  callCC(abortable)
}

请注意,我还将错误处理设置为“通过”,因此foreach将使用错误对象调用组合函数。该callCC函数用于从foreach循环返回,而不管内部foreach和后端使用的错误处理。在这种情况下,callCC将调用该abortable函数,并传递给它一个用于强制callCC立即返回的函数对象。foreach通过从 combine 函数中调用该函数,我们可以在检测到错误对象时退出循环,并callCC返回该对象。有关?callCC更多信息,请参阅。

您实际上可以在parfun没有注册并行后端的情况下使用,并在执行引发错误的任务时验证foreach循环是否“中断”,但这可能需要一段时间,因为任务是按顺序执行的。例如,如果没有注册后端,这需要 20 秒才能执行:

print(system.time(parfun(3, 4)))

parfun并行执行时,我们需要做的不仅仅是跳出foreach循环:我们还需要停止工作人员,否则他们将继续计算分配给他们的任务。使用doMPI,可以使用以下方法停止工作人员mpi.abort

library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
r <- parfun(getDoParWorkers(), getDoParWorkers())
if (inherits(r, 'error')) {
  cat(sprintf('Caught error: %s\n', conditionMessage(r)))
  mpi.abort(cl$comm)
}

请注意,在循环中止后不能使用集群对象,因为事情没有正确清理,这就是为什么正常的“停止”错误处理不能以这种方式工作的原因。

于 2013-04-19T19:22:35.643 回答
4

这不是您问题的直接答案,但是when()如果满足条件,使用您可以避免进入循环:

x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %:%
  when( !is.element(k, 2:6) ) %do%
  {
    cat("Element ", k, "\n")
    Sys.sleep(0.5)
    k
  }

编辑:

我忘记了一些事情:我认为这是设计使然,您不能仅仅停止 foreach 循环。如果您并行运行循环,则每个回合都是独立处理的,这意味着当您停止整个循环时,k=2如果进程k=1已经终止或仍在运行,则无法预测。因此,使用when()条件会给你一个确定的结果。

编辑 2:考虑您的评论的另一个解决方案。

shouldStop <- FALSE
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do%
  {
    if( !shouldStop ){
      # put your time consuming code here
      cat("Element ", k, "\n")
      Sys.sleep(0.5)
      shouldStop <- shouldStop ||  is.element(k, 2:6)
      k
    }
  }

使用此解决方案,在停止条件为真时正在运行的进程仍会计算到结束,但您可以避免所有即将到来的进程的时间消耗。

于 2013-04-18T11:07:01.410 回答
2

当我到达终端循环时,我没有尝试跳出循环,而是将一个小文件写入磁盘,然后根据该文件的存在简单地跳过所有剩余的迭代。

检查文件是否存在花费我们不到一毫秒的计算时间。

# 1.4 seconds to check if a file exists a million times
system.time(lapply(1:1e6, function(x) file.exists("checker.txt")))
   user  system elapsed 
  1.204   0.233   1.437 

当您没有固定数量的迭代或您的过程可以在所有迭代完成之前完成(例如收敛)时,这非常有用

library(foreach)

alist <- foreach(i = 1:5000) %dopar% { 
  if(file.exists("checker.txt")) {
    return(NULL)
  } else {
    if(i = 20) {
      write("", "checker.txt") # write an empty file
    }
    return(i)
  }
}

file.remove("checker.txt")

这样做的好处是,即使你的列表非常长,如果你只是 unlist() 你也只能得到值。

> length(alist)
[1] 5000

> unlist(res)
 [1]  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20

不要费心尝试打破,而是“跳过其余部分”!

于 2020-08-11T02:27:31.590 回答
1

我从 REvolution 技术支持得到的答案是:“不——foreach 目前没有办法停止对任何人的错误的所有并行计算”。

于 2013-04-24T15:37:36.847 回答
0

我没有太多运气foreach去做我想做的事,所以这里有一个使用parallel似乎可以做我想做的事情的包的解决方案。我使用intermediate选项 inmcparallel()将结果从我的函数do.task(), 立即传递给函数check.res()。如果do.task()抛出错误,则用于check.res()触发调用tools::pskill以显式杀死所有工作人员。这可能不是很优雅,但它的工作原理是它会立即停止所有工作。此外,我可以简单地do.task()从当前环境中继承处理所需的所有变量。(实际上do.task()是一个更复杂的函数,需要传入许多变量。)

library(parallel)

# do.task() and check.res() inherit some variables from enclosing environment

do.task <- function(x) {
  cat("Starting task", x, "\n")
  Sys.sleep(5*x)
  if(x==stopat) { 
    stop("Error in job", x) # thrown to mccollect() which sends it to check.res()
  }
  cat("  Completed task", x, "\n")
  return(10*x)
}

check.res <- function(r) { # r is list of results so far
  cat("Called check.res\n")
  sendKill <- FALSE
  for(j in 1:Njob) { # check whether need to kill
    if(inherits(r[[j]], 'try-error')) {
      sendKill <- TRUE
    }
  }
  if(sendKill) { # then kill all
    for(j in 1:Njob) {
      cat("Killing job", job[[j]]$pid, "\n") 
      tools::pskill(job[[j]]$pid) # mckill not accessible
    }
  }
}

Tstart <- Sys.time()
stopat <- 3
Njob <- 4
job <- vector("list", length=Njob)
for(j in 1:Njob) {
  job[[j]]<- mcparallel(do.task(j))
}
res <- mccollect(job, intermediate=check.res) # res is in order 1:Njob, regardless of how long jobs took
cat("Collected\n")
Tstop <- Sys.time()
print(difftime(Tstop,Tstart))
for(j in 1:Njob) {
  if(inherits(res[[j]], 'try-error')) {
    stop("Parallel part encountered an error")
  }
}

这给出了以下屏幕转储和变量结果res

> source("exp5.R")
Starting task 1 
Starting task 2 
Starting task 3 
Starting task 4 
  Completed task 1 
Called check.res
Called check.res
  Completed task 2 
Called check.res
Called check.res
Called check.res
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Called check.res
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Called check.res
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Collected
Time difference of 15.03558 secs
Error in eval(expr, envir, enclos) : Parallel part encountered an error
> res
$`21423`
[1] 10

$`21424`
[1] 20

$`21425`
[1] "Error in do.task(j) : Error in job3\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in do.task(j): Error in job3>

$`21426`
NULL
于 2013-04-24T10:21:46.103 回答
-1

史蒂夫韦斯顿的原始答案基本上回答了这个问题。但这里是他的答案的一个稍微修改过的版本,它还保留了我需要的两个附加功能:(1)随机数生成;(2) 打印运行时诊断。

suppressMessages(library(doMPI))

comb <- function(x, y) {
  if(inherits(y, 'error')) {
    stop(y)
  }
  rbind(x, y) # forces the row names to be 'y'
}

myfunc <- function() {
  writeLines(text="foreach log", con="log.txt")
  foreach(i=1:12, .errorhandling='pass', .combine='comb', .inorder=FALSE, .init=NULL) %dopar% {
    set.seed(100)
    sink("log.txt", append=TRUE)
    if(i==6) {
      stop('testing abort')
    }
    Sys.sleep(10)
    cat("Completed task", i, "\n")
    sink(NULL)
    rnorm(5,mean=i)
  }
}

myerr <- function(e) {
  cat(sprintf('Caught error: %s\n', conditionMessage(e)))
  mpi.abort(cl$comm)
}

cl <- startMPIcluster(4)
registerDoMPI(cl)
r <- tryCatch(myfunc(), error=myerr)
closeCluster(cl)

获取此文件时,它会按预期退出并显示错误消息

> source("exp2.R")
    4 slaves are spawned successfully. 0 failed.
Caught error: testing abort
[ganges.local:16325] MPI_ABORT invoked on rank 0 in communicator  with errorcode 0

'log.txt' 文件提供了直到错误点的正确诊断,然后提供额外的错误信息。至关重要的是,一旦遇到 foreach 循环中的 stop(),所有任务的执行就会停止:它不会等到整个 foreach 循环完成。因此,我最多只能看到 i=4 的“已完成任务”消息。(请注意,如果 Sys.sleep() 更短,那么稍后的任务可能会在处理 mpi.abort() 之前启动。)

如果我将停止条件更改为“i==100”,则不会触发停止,因此不会触发错误。代码成功存在且没有错误信息,并且 r 是一个维度为 12*5 的二维数组。

顺便说一句,似乎我实际上并不需要 .inorder=FALSE (我认为如果发现错误,这只会让我的速度略有提高)。

于 2013-04-23T07:28:22.410 回答