2

我在 R 降雪包中使用 sfApply 进行并行计算。有 32000 个测试要运行。该代码在开始计算时工作正常,它将创建 46 个 Rscript.exe 进程,每个 Rscript.exe 有 2% 的 cpu 使用率。整体 cpu 使用率约为 100%,结果不断写入磁盘。计算通常需要数十个小时。奇怪的是,Rscript.exe进程一个接一个地变得不活跃(cpu使用率=0),对应的cpu也是不活跃的。两天后,通过查看 cpu 使用率,只有一半的 Rscript.exe 处于活动状态,整体 cpu 使用率降低到 50%。然而,这项工作距离完成还很遥远。随着时间的推移,越来越多的 Rscript.exe 处于非活动状态,这使得工作持续时间非常非常长。我想知道是什么让进程和 cpu 核心变得不活跃?

我的电脑有 46 个逻辑核心。我在 64 位 Windows 7 中使用来自 Rstudio 的 R-3.4.0。以下“测试”变量是 32000*2 矩阵。myfunction 正在求解几个微分方程。

谢谢。

    library(snowfall)
    sfInit(parallel=TRUE, cpus=46)
    Sys.time()
    sfLibrary(deSolve)
    sfExport("myfunction","test")
    res<-sfApply(test,1,function(x){myfunction(x[1],x[2])})
    sfStop()
    Sys.time()
4

1 回答 1

2

您所描述的内容听起来很合理,因为在内部snowfall::sfApply()使用snow::parApply(),它将您的数据 ( test) 分成(此处)46 个块,并将每个块发送给 46 个 R 工作人员之一。当一个工人完成它的块时,它就没有更多的工作了,它只会闲置而其余的块由其他工人处理。

您想要做的是将您的数据分成更小的块,这将导致每个工作人员平均处理超过一个块。我不知道(认为?)降雪是否可能。并行包,它是 R 本身的一部分,它取代了雪包(降雪所依赖的),提供parApply()parApplyLB()后者将你的块分成最小大小的地方,即每个数据元素一个(的test)。详情请参阅help("parApply", package = "parallel")

future.apply包(我是作者)为您提供了扩展数据拆分量的选项它不提供apply()版本,而是lapply()您可以使用的版本(以及parApply()内部工作方式)。例如,每个工人使用一个块的示例将是:

library(future.apply)
plan(multisession, workers = 46L)

## Coerce matrix into list with one element per matrix row
test_rows <- lapply(seq_len(nrow(test)), FUN = function(row) test[row,])

res <- future_lapply(test_rows, FUN = function(x) { 
  myfunction(x[1],x[2])
})

默认为

res <- future_lapply(test_rows, FUN = function(x) { 
  myfunction(x[1],x[2])
}, future.scheduling = 1.0)

如果您想拆分数据,以便每个工作人员一次处理一行(参见parallel::parApplyLB()),您可以这样做:

res <- future_lapply(test_rows, FUN = function(x) { 
  myfunction(x[1],x[2])
}, future.scheduling = Inf)

通过设置future.scheduling[1, Inf],您可以控制平均块大小的大小。例如,future.scheduling = 2.0将让每个工作人员在future_lapply()返回之前平均处理两个数据块。

编辑 2021-11-08:thefuture_lapply()和朋友现在在future.apply包中(最初在future中)。

于 2017-06-24T22:13:23.880 回答