我正在使用 R 和 Rmpi 中的并行处理工具对我大学的 Unix 集群进行分析。当我使用 snow 包或仅使用标准 apply 函数在本地(Windows)机器上运行下面显示的 R 脚本时,它非常慢,但似乎工作正常。当我使用 Rmpi 在 Unix 集群上运行它时,我收到一个错误“序列化太大而无法存储在原始向量中”。这篇文章似乎是正确的,但我来回传递的向量(50K)不应该接近触发这个错误。此外,我可以通过将数据分成更小的部分来让事情部分发挥作用。我认为问题一定是与 Rmpi 相关的某种内存泄漏,但我没有调试这种东西所需的工具。希望您至少可以为我指明调试这里发生的事情的工具的方向。
我正在使用的数据有大约 250 万条记录,每条记录有大约 40 列。我向 parApply 提交了一个 50K x 40 的数据帧,它返回一个 50K 的数值向量。
#In an effort to avoid the serialization error I break into smaller pieces to avoid
#problems with large vectors getting transferred between master and slave nodes
numBreaks<-floor(length(data[,1])/50000)
base<-min(length(data[,1]),50000) #test data sets are < 50K and shouldn't
#crash the function piece.in<-data[c(1:base),] #this is the first batch of records I submit
status<-mpi.parApply(piece.in,1,test) #returns a vector of length 50K
#status<-apply(piece.in,1,test) #commented out, function runs fine locally using
#standard apply function
#cl <- makeSOCKcluster(c("localhost","localhost","localhost","localhost","localhost"))
#status<-parApply(cl,piece.in,1,test) #commented out, function runs fine locally
#using snow's SOCK cluster and identical
#parApply function
if(numBreaks>1){
for (i in 2:numBreaks){
piece.in<-data[c((base+1):(base+50000)),]
piece.out<-mpi.parApply(piece.in,1,test) #returns a vector of length 50K
#piece.out<-apply(piece.in,1,test)
#piece.out<-parApply(cl,piece.in,1,test)
status<-append(status,piece.out) #add to existing vector
base<-base+50000 #increment base
print(paste("clusters assigned to records",base,"through",(base+50000)))
}
}
#This piece cleans up the residual records
if((length(c(base:length(data[,1])))>1) & (base < length(data[,1]))){
piece.in<-data[c((base+1):length(data[,1])),]
#piece.out<-apply(piece.in,1,test)
piece.out<-mpi.parApply(piece.in,1,test)
#piece.out<-parApply(cl,piece.in,1,test)
status<-append(status,piece.out)
print(paste("Final: status has length",length(status)))
}
此函数在测试数据集(约 100K 记录)上按预期工作。它使用雪或标准应用功能在本地工作(尽管需要一天)。在我将它分解成更小的部分之前,它因“序列化太大......”错误而失败。当我制作 10 万条记录时,它失败并出现同样的错误。它成功运行(如写入文件的打印语句所记录)一直到覆盖记录 130 万到 135 万的迭代,记录分批提交 50K,然后因序列化错误而失败。
我知道还有其他方法可以进行并行处理(我希望在这里使用 foreach 命令),但我受限于对创建并行节点的计算环境一无所知,所以我想留下如果可以的话,用 Rmpi。关于如何调试的任何提示?
非常感谢。
克里斯