(免责声明:我是future框架和progressr包的作者)
一个类似于base::lapply()
和您的pbapply::pblapply()
示例的紧密解决方案是将future.apply用作:
library(future.apply)
## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
})
分块:您可以通过参数future.chunk.size
或补充来控制分块的数量future.schedule
。要禁用分块以便在唯一的并行任务中处理每个元素,请使用future.chunk.size=1
. 这样,如果有一个元素比其他元素花费的时间长得多,它就不会支撑任何其他元素。
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
并行进度更新:
如果您想在进行并行处理时接收进度更新,您可以使用progressr包并将其配置为使用进度包以进度条的形式报告更新(这里也带有 ETA)。
library(future.apply)
plan(multisession, workers=4)
library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))
with_progress({
p <- progressor(along=xs)
results <- future_lapply(xs, FUN=function(x) {
p() ## signal progress
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
})
您可以将其包装成一个函数,例如
my_fcn <- function(xs) {
p <- progressor(along=xs)
future_lapply(xs, FUN=function(x) {
p()
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
}
这样您就可以将其作为常规函数调用:
> result <- my_fcn(xs)
并用于plan()
精确控制您希望它如何并行化。这不会报告进度。为此,您必须执行以下操作:
> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------] 9% 1m
在后台运行所有内容:如果您的问题是如何在后台运行整个 shebang,请参阅“未来拓扑”小插图。这是另一个级别的并行化,但它是可能的。