0

我正在使用 R 和 Rmpi​​ 中的并行处理工具对我大学的 Unix 集群进行分析。当我使用 snow 包或仅使用标准 apply 函数在本地(Windows)机器上运行下面显示的 R 脚本时,它非常慢,但似乎工作正常。当我使用 Rmpi​​ 在 Unix 集群上运行它时,我收到一个错误“序列化太大而无法存储在原始向量中”。这篇文章似乎是正确的,但我来回传递的向量(50K)不应该接近触发这个错误。此外,我可以通过将数据分成更小的部分来让事情部分发挥作用。我认为问题一定是与 Rmpi​​ 相关的某种内存泄漏,但我没有调试这种东西所需的工具。希望您至少可以为我指明调试这里发生的事情的工具的方向。

我正在使用的数据有大约 250 万条记录,每条记录有大约 40 列。我向 parApply 提交了一个 50K x 40 的数据帧,它返回一个 50K 的数值向量。

#In an effort to avoid the serialization error I break into smaller pieces to avoid    
#problems with large vectors getting transferred between master and slave nodes
numBreaks<-floor(length(data[,1])/50000)
base<-min(length(data[,1]),50000)  #test data sets are < 50K and shouldn't 
                                   #crash the function    piece.in<-data[c(1:base),]         #this is the first batch of records I submit

status<-mpi.parApply(piece.in,1,test) #returns a vector of length 50K
#status<-apply(piece.in,1,test)   #commented out, function runs fine locally using                 
                                  #standard apply function
#cl <- makeSOCKcluster(c("localhost","localhost","localhost","localhost","localhost"))
#status<-parApply(cl,piece.in,1,test) #commented out, function runs fine locally
                                      #using snow's SOCK cluster and identical
                                      #parApply function
if(numBreaks>1){
for (i in 2:numBreaks){
  piece.in<-data[c((base+1):(base+50000)),]
  piece.out<-mpi.parApply(piece.in,1,test)  #returns a vector of length 50K

 #piece.out<-apply(piece.in,1,test)
 #piece.out<-parApply(cl,piece.in,1,test)
  status<-append(status,piece.out)          #add to existing vector
  base<-base+50000                          #increment base
  print(paste("clusters assigned to records",base,"through",(base+50000)))
 }
}

#This piece cleans up the residual records 
if((length(c(base:length(data[,1])))>1) & (base < length(data[,1]))){
  piece.in<-data[c((base+1):length(data[,1])),]
  #piece.out<-apply(piece.in,1,test)
  piece.out<-mpi.parApply(piece.in,1,test)
  #piece.out<-parApply(cl,piece.in,1,test)
  status<-append(status,piece.out)
  print(paste("Final: status has length",length(status)))
 }

此函数在测试数据集(约 100K 记录)上按预期工作。它使用雪或标准应用功能在本地工作(尽管需要一天)。在我将它分解成更小的部分之前,它因“序列化太大......”错误而失败。当我制作 10 万条记录时,它失败并出现同样的错误。它成功运行(如写入文件的打印语句所记录)一直到覆盖记录 130 万到 135 万的迭代,记录分批提交 50K,然后因序列化错误而失败。

我知道还有其他方法可以进行并行处理(我希望在这里使用 foreach 命令),但我受限于对创建并行节点的计算环境一无所知,所以我想留下如果可以的话,用 Rmpi​​。关于如何调试的任何提示?

非常感谢。

克里斯

4

0 回答 0