2

我尝试报告我在mclapply中遇到的关于不允许大返回值的错误。

显然,该错误已在开发版本中得到修复,但我对响应者的评论更感兴趣:

序列化对象的大小有 2GB 的限制,例如 mclapply 可以从分叉的进程返回,本示例尝试 16GB。这在 R-devel 中已被取消(对于 64 位版本),但这种用法非常不寻常且效率很低(该示例需要 ca 150GB,因为(取消)序列化涉及的所有副本)

如果使用 mclapply 对大数据进行并行计算效率低下,那么有什么更好的方法呢?我做这种事情的需求只会越来越大,而且我肯定到处都遇到瓶颈。我看到的教程是关于如何使用函数的非常基本的介绍,但不一定是如何有效地使用函数来管理权衡。该文档对这种权衡有一个小小的介绍:

mc.preschedule:如果设置为 'TRUE',则计算首先被划分为(最多)有多少个作业有核心,然后开始作业,每个作业可能覆盖多个值。如果设置为“FALSE”,则为每个“X”值派生一个作业。前者更适合短计算或'X'中的大量值,后者更适合完成时间差异较大且'X'值与'mc.cores'相比没有太多的工作</p>

默认情况下('mc.preschedule = TRUE')输入'X'被分成与核心一样多的部分(当前值按顺序分布在核心中,即第一个值到核心1,第二个到核心2,...... .. (core + 1)-th value to core 1 etc.) 然后将一个进程分叉到每个核心并收集结果。

在没有预先安排的情况下,会为每个“X”值派生一个单独的作业。为了确保一次运行的作业不超过“mc.cores”,一旦该数量被分叉,主进程会在下一次分叉之前等待子进程完成

可靠地对这些事物进行基准测试需要花费大量时间,因为有些问题只会在规模上体现出来,然后很难弄清楚到底发生了什么。因此,更好地了解函数的行为会很有帮助。

编辑:

我没有具体的例子,因为我经常使用 mclapply 并且想更好地了解如何考虑性能影响。虽然写入磁盘可以解决错误,但我认为这对于必须发生的(反)序列化没有帮助,这也必须通过磁盘 IO。

一个工作流程如下:获取一个大的稀疏矩阵M,并将其分块写入磁盘(例如M1-M100),因为 M 本身不适合内存。

现在说,对于其中的每个用户,我想在用户级别添加和聚合i其中ICi列。M对于较小的数据,这将是相对微不足道的:

m = matrix(runif(25), ncol=5)
df = data.frame(I=sample(1:6, 20, replace=T), C=sample(1:5, 20, replace=T))
somefun = function(m) rowSums(m)
res = sapply(sort(unique(df$I)), function(i) somefun(m[,df[df$I == i,]$C]))

但是对于更大的数据,我的方法是根据列所在的矩阵将用户/列的 data.frame 拆分为不同的 data.frames M1-M100,对这些 data.frames 执行并行循环,读入关联的矩阵,然后然后遍历用户,提取列并应用我的函数,然后获取输出列表,再次循环并重新聚合。

如果我有一个不能像那样重新聚合的函数(到目前为止,这不是问题),这并不理想,但我显然用这种方法洗牌了太多数据。

4

2 回答 2

1

我希望我的回答还不算晚,但我认为您的示例可以通过bigmemory包使用共享内存/文件来处理。

让我们创建数据

library(bigmemory)
library(parallel)

#your large file-backed matrix (all values initialized to 0)
#it can hold more than your RAM as it is written to a file
m=filebacked.big.matrix(nrow=5,
                        ncol=5,
                        type="double",
                        descriptorfile="file_backed_matrix.desc",
                        backingfile="file_backed_matrix",
                        backingpath="~")

#be careful how to fill the large matrix with data
set.seed(1234)
m[]=c(matrix(runif(25), ncol=5))
#print the data to the console
m[]

#your user-col mapping
#I have added a unique idx that will be used below
df = data.frame(unique_idx=1:20,
                I=sample(1:6, 20, replace=T),
                C=sample(1:5, 20, replace=T))

#the file-backed matrix that will hold the results
resm=filebacked.big.matrix(nrow=nrow(df),
                           ncol=2,
                           type="double",init = NA_real_,
                           descriptorfile="res_matrix.desc",
                           backingfile="res_backed_matrix",
                           backingpath="~")

#the first column of resm will hold the unique idx of df
resm[,1]=df$unique_idx
resm[]

现在,让我们转到您要执行的功能。您写了rowSums但从您的文字中推断出您的意思colSums。我相应地改变了这一点。

somefun = function(x) {
  #attach the file-backed big.matrix
  #it makes the matrix "known" to the R process (no copying involved!)
  #input
  tmp=attach.big.matrix("~/file_backed_matrix.desc")
  #output
  tmp_out=attach.big.matrix("~/res_matrix.desc")

  #store the output in the file-backed matrix resm
  tmp_out[x$unique_idx,2]=c(colSums(tmp[,x$C,drop=FALSE]))
  #return a little more than the colSum result
  list(pid=Sys.getpid(),
       user=x$I[1],
       col_idx=x$C)
}

在所有内核上进行并行计算

#perform colSums using different threads
res=mclapply(split(df,df$I),somefun,mc.cores = detectCores())

检查结果

#processes IDs
unname(sapply(res,function(x) x$pid))
#28231 28232 28233 28234 28231 28232

#users
unname(sapply(res,function(x) x$user))
#1 2 3 4 5 6 

#column indexes
identical(sort(unname(unlist(sapply(res,function(x) x$col_idx)))),sort(df$C))
#[1] TRUE

#check result of colSums
identical(lapply(split(df,df$I),function(x) resm[x$unique_idx,2]),
          lapply(split(df,df$I),function(x) colSums(m[,x$C,drop=FALSE])))
#[1] TRUE

编辑:我已经在我的编辑中处理了您的评论。将结果存储在文件支持的输出矩阵resm中按预期工作。

于 2016-09-08T21:46:17.170 回答
0

为了限制中等大 N 的开销,使用它几乎总是更好mc.preschedule = TRUE(即将工作分成与内核一样多的块)。

看来您的主要权衡是在内存使用和 CPU 之间。也就是说,您只能并行化,直到正在进行的进程最大化您的 RAM。要考虑的一件事是,不同的工作人员可以在您的 R 会话中读取相同的对象而不会重复。因此,只有在并行函数调用中修改/创建的对象才会为每个内核添加它们的内存占用。

如果您最大化内存,我的建议是将您的整个计算分成多个子作业并依次循环(例如使用 lapply),在该循环中调用 mclapply 以并行化每个子作业,并可能保存输出子作业到磁盘以避免将其全部保存在内存中。

于 2016-09-03T14:54:22.940 回答