4

我目前正在开发一个 R 包,它将通过“并行”包使用并行计算来解决一些任务。

当使用在我的包的函数中定义的集群时,我遇到了一些非常尴尬的行为,其中 parLapply 函数将作业分配给工作人员并等待它完成将工作分配给下一个工作人员。或者至少这似乎是正在发生的事情,通过观察日志文件“cluster.log”和 unix shell 中正在运行的进程列表。

下面是我的包中声明的原始函数的模型版本:

.parSolver <- function( varMatrix, var1 ) {

    no_cores <- detectCores()

    #Rows in varMatrix
    rows <- 1:nrow(varMatrix[,])

    # Split rows in n parts
    n <- no_cores
    parts <- split(rows, cut(rows, n))

    # Initiate cluster
    cl <- makePSOCKcluster(no_cores, methods = FALSE, outfile = "/home/cluster.log")
    clusterEvalQ(cl, library(raster))
    clusterExport(cl, "varMatrix", envir=environment())
    clusterExport(cl, "var1", envir=environment())


    rParts <- parLapply(cl = cl, X = 1:n, fun = function(x){
        part <- rasterize(varMatrix[parts[[x]],], raster(var1), .....)
        print(x)
        return(part)
        })

    do.call(merge, rParts)
}

笔记:

  • 我使用 makePSOCKcluster 是因为我希望代码在 Windows 和 unix 系统上运行,尽管这个特殊问题只在 unix 系统中表现出来。
  • 函数 rasterize 和 raster 在 library(raster) 中定义,导出到集群。

对我来说奇怪的部分是,如果我在全局环境中执行函数 parSolver 的完全相同的代码,每件事都会顺利进行,所有工作人员同时从事一项工作,并且任务立即完成。但是,如果我这样做:

library(myPackage)

varMatrix <- (...)
var1 <- (...)
result <- parSolver(varMatrix, var1)

出现描述的问题。

这似乎是一个负载平衡问题,但这并不能解释为什么它在一种情况下可以正常工作,而在另一种情况下却不行。

我在这里错过了什么吗?提前致谢。

4

1 回答 1

3

我不认为parLapply是按顺序运行的。更有可能的是,它只是运行效率低下,使它看起来是按顺序运行的。

我有一些改进它的建议:

  • 不要在里面定义worker函数parSolver
  • 不要全部导出varMatrix给每个工人
  • 在外部创建集群parSolver

第一点很重要,因为就像您现在的示例一样,定义在 中的所有变量parSolver都将与匿名工作函数一起序列化,并通过 发送给工作人员parLapply。通过在任何函数之外定义辅助函数,序列化不会捕获任何不需要的变量。

第二点避免了不必要的套接字 I/O 并使用更少的内存,使代码更具可扩展性。

这是一个与您的相似的虚假但独立的示例,它展示了我的建议:

# Define worker function outside of any function to avoid
# serialization problems (such as unexpected variable capture)
workerfn <- function(mat, var1) {
    library(raster)
    mat * var1
}

parSolver <- function(cl, varMatrix, var1) {
    parts <- splitIndices(nrow(varMatrix), length(cl))
    varMatrixParts <- lapply(parts, function(i) varMatrix[i,,drop=FALSE])
    rParts <- clusterApply(cl, varMatrixParts, workerfn, var1)
    do.call(rbind, rParts)
}

library(parallel)
cl <- makePSOCKcluster(3)
r <- parSolver(cl, matrix(1:20, 10, 2), 2)
print(r)

请注意,这利用了clusterApply函数来遍历行块列表,varMatrix这样就不需要将整个矩阵发送给每个人。它还避免了对clusterEvalQand的调用clusterExport,从而简化了代码,并提高了它的效率。

于 2017-08-01T21:16:53.637 回答