3

我在 R 编程世界中有点新,我正在处理一些与(不是那么多)大数据处理的并行化相关的问题。

为此,我使用 data.table 包进行数据存储和处理,并使用 snowfall 包作为包装器来并行化工作。

我举了一个具体的例子:我有一个很大的元素向量,我想在每个元素上应用一个函数 f(我使用向量化版本);然后我将大向量平衡为 N 个部分(较小的向量),如下所示:

 sfInit(parallel = TRUE, cpus = ncpus)
 balancedVector <-myVectorLoadBalanceFunction(myLargeVector, ncpus)
 processedSubVectors <- sfLapply(balancedVector, function(subVector) {
   myVectorizedFunction(subVector)
 })
 sfStop()

我看到奇怪的是,当我从命令行或脚本(即 largeVector 在全局环境中)运行这段代码时,性能在时间方面很好,我在 MS Windows 任务管理器中看到每个核心似乎正在使用与 subVector 大小成比例的内存量;但是当我在函数环境中运行代码(即从命令行调用它并将 largeVector 作为参数传递)时,性能会随着时间的推移而变差,我检查每个核心现在似乎都在使用大矢量...

这有意义吗?

问候

编辑添加一个可重现的例子

只是为了简单起见,一个虚拟示例,其 Date 向量约为 300 MB,具有 +36 M 个元素和一个工作日函数:

library(snowfall)

aSomewhatLargeVector <- seq.Date(from = as.Date("1900-01-01"), to = as.Date("2000-01-01"), by = 1)
aSomewhatLargeVector <- rep(aSomewhatLargeVector, 1000)

# Sequential version to compare

system.time(processedSubVectorsSequential <- weekdays(aSomewhatLargeVector))
# user  system elapsed 
# 108.05    1.06  109.53 

gc() # I restarted R



# Parallel version within a function scope

myCallingFunction = function(aSomewhatLargeVector) {
  sfInit(parallel = TRUE, cpus = 2)
  balancedVector <- list(aSomewhatLargeVector[seq(1, length(aSomewhatLargeVector)/2)],
                         aSomewhatLargeVector[seq(length(aSomewhatLargeVector)/2+1, length(aSomewhatLargeVector))])
  processedSubVectorsParallelFunction <- sfLapply(balancedVector, function(subVector) {
    weekdays(subVector)
  })
  sfStop() 
  processedSubVectorsParallelFunction <- unlist(processedSubVectorsParallelFunction)
  return(processedSubVectorsParallelFunction)
}

system.time(processedSubVectorsParallelFunction <- myCallingFunction(aSomewhatLargeVector))
# user  system elapsed 
# 11.63   10.61   94.27 
# user  system elapsed 
# 12.12    9.09   99.07 

gc() # I restarted R



# Parallel version within the global scope

time0 <- proc.time()
sfInit(parallel = TRUE, cpus = 2)
balancedVector <- list(aSomewhatLargeVector[seq(1, length(aSomewhatLargeVector)/2)],
                       aSomewhatLargeVector[seq(length(aSomewhatLargeVector)/2+1, length(aSomewhatLargeVector))])
processedSubVectorsParallel <- sfLapply(balancedVector, function(subVector) {
  weekdays(subVector)
})
sfStop() 
processedSubVectorsParallel <- unlist(processedSubVectorsParallel)
time1 <- proc.time()
time1-time0
# user  system elapsed 
# 7.94    4.75   85.14 
# user  system elapsed 
# 9.92    3.93   89.69 

我的时间出现在评论中,虽然这个虚拟示例没有那么显着的差异,但可以看出顺序时间>函数内并行>全局并行

此外,您可以看到分配内存的差异:

在此处输入图像描述

3.3 GB < 5.2 GB > 4.4 GB

希望这可以帮助

4

0 回答 0