3

到目前为止,我发现lapply在 R 中使用并行的最简单方法是通过以下示例代码:

library(parallel)
library(pbapply)

cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})

results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)

这有一个非常有用的特性,即为结果提供进度条,并且在不需要并行计算时很容易重用相同的代码,通过设置cl = NULL.

但是,我注意到的一个问题是pblapply正在批量循环列表。例如,如果一个工人在某项任务上卡了很长时间,剩下的工人将等待它完成,然后再开始新的一批工作。对于某些任务,这会为工作流程增加大量不必要的时间。

我的问题: 是否有任何类似的并行框架可以让工作人员独立运行?进度条和重用代码的能力cl=NULL将是一大优势。

也许可以修改现有代码pbapply以添加此选项/功能?

4

2 回答 2

5

(免责声明:我是future框架和progressr包的作者)

一个类似于base::lapply()和您的pbapply::pblapply()示例的紧密解决方案是将future.apply用作:

library(future.apply)

## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)

xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
  Sys.sleep(0.1)
  sqrt(x)
})

分块:您可以通过参数future.chunk.size或补充来控制分块的数量future.schedule。要禁用分块以便在唯一的并行任务中处理每个元素,请使用future.chunk.size=1. 这样,如果有一个元素比其他元素花费的时间长得多,它就不会支撑任何其他元素。

xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
  Sys.sleep(0.1)
  sqrt(x)
}, future.chunk.size=1)

并行进度更新: 如果您想在进行并行处理时接收进度更新,您可以使用progressr包并将其配置为使用进度包以进度条的形式报告更新(这里也带有 ETA)。

library(future.apply)
plan(multisession, workers=4)

library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))

with_progress({
  p <- progressor(along=xs)
  results <- future_lapply(xs, FUN=function(x) {
    p()  ## signal progress
    Sys.sleep(0.1)
    sqrt(x)
  }, future.chunk.size=1)
})

您可以将其包装成一个函数,例如

my_fcn <- function(xs) {
  p <- progressor(along=xs)
  future_lapply(xs, FUN=function(x) {
    p()
    Sys.sleep(0.1)
    sqrt(x)
  }, future.chunk.size=1)
}

这样您就可以将其作为常规函数调用:

> result <- my_fcn(xs)

并用于plan()精确控制您希望它如何并行化。这不会报告进度。为此,您必须执行以下操作:

> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------]   9%  1m

在后台运行所有内容:如果您的问题是如何在后台运行整个 shebang,请参阅“未来拓扑”小插图。这是另一个级别的并行化,但它是可能的。

于 2020-07-17T02:55:55.807 回答
2

您可以使用在多进程模式下运行的furrr包:futurepurrr

library(furrr)
plan(multisession, workers = nbrOfWorkers()-1)
nbrOfWorkers()
1:100 %>% future_map(~{Sys.sleep(1); rnorm(.x)},.progress = T)
Progress: ──────────────────────────────                                   100%

您可以关闭并行计算plan(sequential)

于 2020-07-15T13:36:25.370 回答